AWS Data pipeline CSV data from S3 to DynamoDB

NKS picture NKS · Aug 3, 2013 · Viewed 12k times · Source

I am trying to transfer CSV data from S3 bucket to DynamoDB using AWS pipeline, following is my pipe line script, it is not working properly,

CSV file structure

Name, Designation,Company

A,TL,C1

B,Prog, C2

DynamoDb : N_Table, with Name as hash value

{
"objects": [
    {
        "id": "Default",
        "scheduleType": "cron",
        "name": "Default",
        "role": "DataPipelineDefaultRole",
        "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
        "id": "DynamoDBDataNodeId635",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "tableName": "N_Table",
        "name": "MyDynamoDBData",
        "type": "DynamoDBDataNode"
    },
    {
        "emrLogUri": "s3://onlycsv/error",
        "id": "EmrClusterId636",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "masterInstanceType": "m1.small",
        "coreInstanceType": "m1.xlarge",
        "enableDebugging": "true",
        "installHive": "latest",
        "name": "ImportCluster",
        "coreInstanceCount": "1",
        "logUri": "s3://onlycsv/error1",
        "type": "EmrCluster"
    },
    {
        "id": "S3DataNodeId643",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "directoryPath": "s3://onlycsv/data.csv",
        "name": "MyS3Data",
        "dataFormat": {
            "ref": "DataFormatId1"
        },
        "type": "S3DataNode"
    },
    {
        "id": "ScheduleId639",
        "startDateTime": "2013-08-03T00:00:00",
        "name": "ImportSchedule",
        "period": "1 Hours",
        "type": "Schedule",
        "endDateTime": "2013-08-04T00:00:00"
    },
    {
        "id": "EmrActivityId637",
        "input": {
            "ref": "S3DataNodeId643"
        },
        "schedule": {
            "ref": "ScheduleId639"
        },
        "name": "MyImportJob",
        "runsOn": {
            "ref": "EmrClusterId636"
        },
        "maximumRetries": "0",
        "myDynamoDBWriteThroughputRatio": "0.25",
        "attemptTimeout": "24 hours",
        "type": "EmrActivity",
        "output": {
            "ref": "DynamoDBDataNodeId635"
        },
        "step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com"
    },
    {
        "id": "DataFormatId1",
        "name": "DefaultDataFormat1",
        "column": [
            "Name",
            "Designation",
            "Company"
        ],
        "columnSeparator": ",",
        "recordSeparator": "\n",
        "type": "Custom"
    }
]

}

Out of four steps while executing the pipeline, two are getting finished, but it is not executing completely

Answer

Altair7852 picture Altair7852 · Apr 11, 2015

Currently (2015-04) default import pipeline template does not support importing CSV files.

If your CSV file is not too big (under 1GB or so) you can create a ShellCommandActivity to convert CSV to DynamoDB JSON format first and the feed that to EmrActivity that imports the resulting JSON file into your table.

As a first step you can create sample DynamoDB table including all the field types you need, populate with dummy values and then export the records using pipeline (Export/Import button in DynamoDB console). This will give you the idea about the format that is expected by Import pipeline. The type names are not obvious, and the Import activity is very sensitive about the correct case (e.g. you should have bOOL for boolean field).

Afterwards it should be easy to create an awk script (or any other text converter, at least with awk you can use the default AMI image for your shell activity), which you can feed to your shellCommandActivity. Don't forget to enable "staging" flag, so your output is uploaded back to S3 for the Import activity to pick it up.