How to upload data in bulk to the appengine datastore? Older methods do not work

Cygorger picture Cygorger · Aug 27, 2014 · Viewed 7.8k times · Source

This should be a fairly common requirement, and a simple process: upload data in bulk to the appengine datastore.

However, none of the older solutions mentioned on stackoverflow (links below*) seem to work anymore. The bulkloader method, which was the most reasonable solution when uploading to the datastore using the DB API doesn't work with the NDB API

And now the bulkloader method seems to have been deprecated and the old links, which are still present in the docs, lead to the wrong page. Here's an example

https://developers.google.com/appengine/docs/python/tools/uploadingdata

This above link is still present on this page: https://developers.google.com/appengine/docs/python/tools/uploadinganapp

What is the recommended method for bulkloading data now?

The two feasible alternatives seem to be 1) using the remote_api or 2) writing a CSV file to a GCS bucket and reading from that. Anybody have experience successfully using either method?

Any pointers will be greatly appreciated. Thanks!

[*The solutions offered at the links below are no longer valid]

[1] how does one upload data in bulk to a google appengine datastore?

[2] How to insert bulk data in Google App Engine Datastore?

Answer

Sriram picture Sriram · Jan 11, 2015

Method 1: Use remote_api

How to : write a bulkloader.yaml file and run it directly using “appcfg.py upload_data” command from terminal I don’t recommend this method for a couple of reasons: 1. huge latency 2. no support for NDB

Method 2: GCS and use mapreduce

Uploading Data File to GCS:

Use the “storage-file-transfer-json-python” github project (chunked_transfer.py) to upload files to gcs from your local system. Make sure to generate proper “client-secrets.json” file from the app engine admin console.

Mapreduce:

Use the "appengine-mapreduce" github project. Copy the "mapreduce" folder to your project top-level folder.

Add the below line to your app.yaml file:

includes:
  - mapreduce/include.yaml

Below is your main.py file

import cgi
import webapp2
import logging
import os, csv
from models import DataStoreModel
import StringIO
from google.appengine.api import app_identity
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader

def testmapperFunc(newRequest):
    f = StringIO.StringIO(newRequest)
    reader = csv.reader(f, delimiter=',')
    for row in reader:
        newEntry = DataStoreModel(attr1=row[0], link=row[1])
        yield op.db.Put(newEntry)

class TestGCSReaderPipeline(base_handler.PipelineBase):
    def run(self, filename):
        yield mapreduce_pipeline.MapreducePipeline(
                "test_gcs",
                "testgcs.testmapperFunc",
                "mapreduce.input_readers.FileInputReader",
                mapper_params={
                    "files": [filename],
                    "format": 'lines'
                },
                shards=1)

class tempTestRequestGCSUpload(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())

        bucket = '/gs/' + bucket_name
        filename = bucket + '/' + 'tempfile.csv'

        pipeline = TestGCSReaderPipeline(filename)
        pipeline.with_params(target="mapreducetestmodtest")
        pipeline.start()
        self.response.out.write('done')

application = webapp2.WSGIApplication([
    ('/gcsupload', tempTestRequestGCSUpload),
], debug=True)

To remember:

  1. Mapreduce project uses the now-deprecated “Google Cloud Storage Files API”. So support in future is not guaranteed.
  2. Map reduce adds a small overhead to datastore reads and writes.

Method 3: GCS and GCS Client Library

  1. Upload the csv/text file to gcs using the above file-transfer method.
  2. Use gcs client library (copy the 'cloudstorage' folder to your application top-level folder).

Add the below code to the application main.py file.

import cgi
import webapp2
import logging
import jinja2
import os, csv
import cloudstorage as gcs
from google.appengine.ext import ndb
from google.appengine.api import app_identity
from models import DataStoreModel

class UploadGCSData(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())
        bucket = '/' + bucket_name
        filename = bucket + '/tempfile.csv'
        self.upload_file(filename)

    def upload_file(self, filename):
        gcs_file = gcs.open(filename)
        datareader = csv.reader(gcs_file)
        count = 0
        entities = []
        for row in datareader:
            count += 1
                newProd = DataStoreModel(attr1=row[0], link=row[1])
                entities.append(newProd)

            if count%50==0 and entities:
                ndb.put_multi(entities)
                entities=[]

        if entities:
            ndb.put_multi(entities)

application = webapp2.WSGIApplication([
    ('/gcsupload', UploadGCSData),
], debug=True)