TensorFlow/Keras multi-threaded model fitting

bnaul picture bnaul · Feb 19, 2017 · Viewed 13.2k times · Source

I'm attempting to train multiple keras models with different parameter values using multiple threads (and the tensorflow backend). I've seen a few examples of using the same model within multiple threads, but in this particular case, I run into various errors regarding conflicting graphs, etc. Here's a simple example of what I'd like to be able to do:

from concurrent.futures import ThreadPoolExecutor
import numpy as np
import tensorflow as tf
from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential


sess = tf.Session()


def example_model(size):
    model = Sequential()
    model.add(Dense(size, input_shape=(5,)))
    model.add(Dense(1))
    model.compile(optimizer='sgd', loss='mse')
    return model


if __name__ == '__main__':
    K.set_session(sess)
    X = np.random.random((10, 5))
    y = np.random.random((10, 1))
    models = [example_model(i) for i in range(5, 10)]

    e = ThreadPoolExecutor(4)
    res_list = [e.submit(model.fit, X, y) for model in models]

    for res in res_list:
        print(res.result())

The resulting error is ValueError: Tensor("Variable:0", shape=(5, 5), dtype=float32_ref) must be from the same graph as Tensor("Variable_2/read:0", shape=(), dtype=float32).. I've also tried initializing the models within the threads which gives a similar failure.

Any thoughts on the best way to go about this? I'm not at all attached to this exact structure, but I'd prefer to be able to use multiple threads rather than processes so all the models are trained within the same GPU memory allocation.

Answer

dkamm picture dkamm · Aug 23, 2017

Tensorflow Graphs are not threadsafe (see https://www.tensorflow.org/api_docs/python/tf/Graph) and when you create a new Tensorflow Session, it by default uses the default graph.

You can get around this by creating a new session with a new graph in your parallelized function and constructing your keras model there.

Here is some code that creates and fits a model on each available gpu in parallel:

import concurrent.futures
import numpy as np

import keras.backend as K
from keras.layers import Dense
from keras.models import Sequential

import tensorflow as tf
from tensorflow.python.client import device_lib

def get_available_gpus():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos if x.device_type == 'GPU']

xdata = np.random.randn(100, 8)
ytrue = np.random.randint(0, 2, 100)

def fit(gpu):
    with tf.Session(graph=tf.Graph()) as sess:
        K.set_session(sess)
        with tf.device(gpu):
            model = Sequential()
            model.add(Dense(12, input_dim=8, activation='relu'))
            model.add(Dense(8, activation='relu'))
            model.add(Dense(1, activation='sigmoid'))

            model.compile(loss='binary_crossentropy', optimizer='adam')
            model.fit(xdata, ytrue, verbose=0)

            return model.evaluate(xdata, ytrue, verbose=0)

gpus = get_available_gpus()
with concurrent.futures.ThreadPoolExecutor(len(gpus)) as executor:
    results = [x for x in executor.map(fit, gpus)]
print('results: ', results)