How to use boto3 client with Python multiprocessing?

RNHTTR picture RNHTTR · Jul 12, 2018 · Viewed 16.9k times · Source

Code looks something like this:

import multiprocessing as mp
from functools import partial

import boto3
import numpy as np


s3 = boto3.client('s3')

def _something(**kwargs):
    # Some mixed integer programming stuff related to the variable archive
    return np.array(some_variable_related_to_archive)


def do(s3):
    archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
    pool = mp.pool()
    sub_process = partial(_something, slack=0.1)
    parts = np.array_split(archive, some_int)
    target_parts = np.array(things)

    out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line

    pool.close()
    pool.join()

do(s3)

The error:

_pickle.PicklingError: Can't pickle <class 'botocore.client.S3'>: attribute lookup S3 on botocore.client failed

I have very limited experience with the Python multiprocessing library. I'm not sure why, when the S3 client is not a parameter in any of the functions, it throws the above error. Note that the code is able to run okay if the archive file is loaded from disk, and not from S3.

Any help/guidance would be greatly appreciated.

Answer

RNHTTR picture RNHTTR · Jul 17, 2018

Objects passed to mp.starmap() must be pickle-able, and S3 clients are not pickle-able. Bringing the actions of the S3 client outside of the function that calls mp.starmap() can solve the issue:

import multiprocessing as mp
from functools import partial

import boto3
import numpy as np


s3 = boto3.client('s3')
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant # Move the s3 call here, outside of the do() function

def _something(**kwargs):
    # Some mixed integer programming stuff related to the variable archive
    return np.array(some_variable_related_to_archive)


def do(archive): # pass the previously loaded archive, and not the s3 object into the function
    pool = mp.pool()
    sub_process = partial(_something, slack=0.1)
    parts = np.array_split(archive, some_int)
    target_parts = np.array(things)

    out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line

    pool.close()
    pool.join()

do(archive) # pass the previously loaded archive, and not the s3 object into the function