I should start saying that I am completely new to any kind of parallelism/multithreading/multiprocessing programming.
Now, I have the chance to run my TensorFlow CNN on 32 cores (each with 2 hyperthreads). I've spent a lot of time trying to understand how should I modify (if I have to) my code in order to exploit all of that computational power. Unfortuantely, I didn't come to anything. I hoped that TF could do that automatically but when I launch my model and check with top
the CPU usage, I see most of the time a 100% CPU usage and a few 200% peaks.
If all the cores were used, I would expect to see a 100*64=6400% usage (correct?). How can I accomplish this?
Should I do something similar to what is explained here?
If that is the case, do I understand correctly that all the multithreading is only applied to calculations which involve Queue?
Is this really all that can be done to use all the computational power available (since it appears to me that queue are only used when reading and batching training samples)?
This is what my code looks like, if needed: (main.py)
# pylint: disable=missing-docstring
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from pylab import *
import argparse
import cnn
import freader_2
training_feats_file = ["file_name"]
training_lbls_file = ["file_name"]
test_feats_file = 'file_name'
test_lbls_file = 'file_name'
learning_rate = 0.1
testset_size = 1000
batch_size = 1000
testset_size = 793
tot_samples = 810901
max_steps = 3300
def placeholder_inputs(batch_size):
images_placeholder = tf.placeholder(tf.float32, shape=(testset_size, cnn.IMAGE_HEIGHT, cnn.IMAGE_WIDTH, 1))
labels_placeholder = tf.placeholder(tf.float32, shape=(testset_size, 15))
return images_placeholder, labels_placeholder
def reader(images_file, lbls_file, images_pl, labels_pl, im_height, im_width):
images = loadtxt(images_file)
labels_feed = loadtxt(lbls_file)
images_feed = reshape(images, [images.shape[0], im_height, im_width, 1])
feed_dict = {
images_pl: images_feed,
labels_pl: labels_feed,
}
return feed_dict
tot_training_loss = []
tot_test_loss = []
tot_grad = []
print('Starting TensorFlow session...')
with tf.Graph().as_default():
DS = freader_2.XICSDataSet()
images, labels = DS.trainingset_files_reader(training_feats_file, training_lbls_file)
keep_prob = tf.placeholder(tf.float32)
logits = cnn.inference(images, batch_size, keep_prob)
loss = cnn.loss(logits, labels)
global_step = tf.Variable(0, trainable=False)
train_op, grad_norm = cnn.training(loss, learning_rate, global_step)
summary_op = tf.merge_all_summaries()
test_images_pl, test_labels_pl = placeholder_inputs(testset_size)
test_pred = cnn.inference(test_images_pl, testset_size, keep_prob, True)
test_loss = cnn.loss(test_pred, test_labels_pl)
saver = tf.train.Saver()
sess = tf.Session()
summary_writer = tf.train.SummaryWriter("CNN", sess.graph)
init = tf.initialize_all_variables()
sess.run(init)
tf.train.start_queue_runners(sess=sess)
test_feed = reader(test_feats_file, test_lbls_file, test_images_pl, test_labels_pl, DS.height, DS.width)
test_feed[keep_prob] = 1.
# Start the training loop.
print('Starting training loop...')
start_time = time.time()
for step in xrange(max_steps):
_, grad, loss_value= sess.run([train_op, grad_norm, loss], feed_dict = {keep_prob:0.5})
tot_training_loss.append(loss_value)
tot_grad.append(grad)
_, test_loss_val = sess.run([test_pred, test_loss], feed_dict=test_feed)
tot_test_loss.append(test_loss_val)
if step % 1 == 0:
duration = time.time() - start_time
print('Step %d (%.3f sec):\n training loss = %f\n test loss = %f ' % (step, duration, loss_value, test_loss_val))
print(' gradient = %f'%grad)
# summary_str = sess.run(summary_op)#, feed_dict=feed_dict)
# summary_writer.add_summary(summary_str, step)
# summary_writer.flush()
if (step+1) % 100 == 0:
print('Saving checkpoint...')
saver.save(sess, "chkpts/medias-res", global_step = global_step)
if test_loss_val < 0.01:# or grad < 0.01:
print("Stopping condition reached.")
break
print('Saving final network...')
saver.save(sess, "chkpts/final.chkpt")
print('Total training time: ' + str((time.time() - start_time)/3600) + ' h')
cnn.py:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import tensorflow as tf
NUM_OUTPUT = 15
IMAGE_WIDTH = 195
IMAGE_HEIGHT = 20
IMAGE_PIXELS = IMAGE_WIDTH * IMAGE_HEIGHT
def inference(images, num_samples, keep_prob, reuse=None):
with tf.variable_scope('conv1', reuse=reuse):
kernel = tf.get_variable(name='weights', shape=[3, 30, 1, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
conv = tf.nn.conv2d(images, kernel, [1, 1, 5, 1], padding='VALID')
# output dim: 18x34
biases = tf.Variable(tf.constant(0.0, name='biases', shape=[5]))
bias = tf.nn.bias_add(conv, biases)
conv1 = tf.nn.relu(bias, name='conv1')
pool1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool1')
#output dim: 9x17
with tf.variable_scope('conv2', reuse=reuse):
kernel = tf.get_variable(name='weights', shape=[2, 2, 5, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='VALID')
#output dim: 8x16
biases = tf.Variable(tf.constant(0.1, name='biases', shape=[5]))
bias = tf.nn.bias_add(conv, biases)
conv2 = tf.nn.relu(bias, name='conv2')
pool2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool2')
#output dim: 4x8
h_fc1_drop = tf.nn.dropout(pool2, keep_prob)
with tf.variable_scope('fully_connected', reuse=reuse):
reshape = tf.reshape(h_fc1_drop, [num_samples, -1])
dim = reshape.get_shape()[1].value
weights = tf.get_variable(name='weights', shape=[dim, 20], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
biases = tf.Variable(tf.zeros([20], name='biases'))
fully_connected = tf.nn.relu(tf.matmul(reshape, weights) + biases, name='fully_connected')
with tf.variable_scope('identity', reuse=reuse):
weights = tf.get_variable(name='weights', shape=[20,NUM_OUTPUT], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
biases = tf.Variable(tf.zeros([NUM_OUTPUT], name='biases'))
output = tf.matmul(fully_connected, weights) + biases
return output
def loss(outputs, labels):
rmse = tf.sqrt(tf.reduce_mean(tf.square(tf.sub(labels, outputs))), name="rmse")
loss_list = tf.get_collection('losses')
loss_list.append(rmse)
rmse_tot = tf.add_n(loss_list, name='total_loss')
return rmse_tot
def training(loss, starter_learning_rate, global_step):
tf.scalar_summary(loss.op.name, loss)
# optimizer = tf.train.AdamOptimizer()
learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 200, 0.8, staircase=True)
optimizer = tf.train.MomentumOptimizer(learning_rate, 0.8)
grads_and_vars = optimizer.compute_gradients(loss)
grad_norms = [tf.nn.l2_loss(g[0]) for g in grads_and_vars]
grad_norm = tf.add_n(grad_norms)
train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step)
# train_op = optimizer.minimize(loss, global_step=global_step)
return train_op, grad_norm
freader_2.py:
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import collections
import numpy as np
from six.moves import xrange
import tensorflow as tf
class XICSDataSet:
def __init__(self, height=20, width=195, batch_size=1000, noutput=15):
self.depth = 1
self.height = height
self.width = width
self.batch_size = batch_size
self.noutput = noutput
def trainingset_files_reader(self, im_file_name, lb_file_name, nfiles=1):
im_filename_queue = tf.train.string_input_producer(im_file_name, shuffle=False)
lb_filename_queue = tf.train.string_input_producer(lb_file_name, shuffle=False)
imreader = tf.TextLineReader()
lbreader = tf.TextLineReader()
imkey, imvalue = imreader.read(im_filename_queue)
lbkey, lbvalue = lbreader.read(lb_filename_queue)
im_record_defaults = [[.0]]*self.height*self.width
lb_record_defaults = [[.0]]*self.noutput
im_data_tuple = tf.decode_csv(imvalue, record_defaults=im_record_defaults, field_delim = ' ')
lb_data_tuple = tf.decode_csv(lbvalue, record_defaults=lb_record_defaults, field_delim = ' ')
features = tf.pack(im_data_tuple)
label = tf.pack(lb_data_tuple)
depth_major = tf.reshape(features, [self.height, self.width, self.depth])
min_after_dequeue = 10
capacity = min_after_dequeue + 3 * self.batch_size
example_batch, label_batch = tf.train.shuffle_batch([depth_major, label], batch_size=self.batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
According to Tensorflow:
The two configurations listed below are used to optimize CPU performance by adjusting the thread pools.
intra_op_parallelism_threads
: Nodes that can use multiple threads to parallelize their execution will schedule the individual pieces into this pool.inter_op_parallelism_threads
: All ready nodes are scheduled in this pool.These configurations are set via the
tf.ConfigProto
and passed totf.Session
in theconfig
attribute as shown in the snippet below. For both configuration options, if they are unset or set to 0, will default to the number of logical CPU cores. Testing has shown that the default is effective for systems ranging from one CPU with 4 cores to multiple CPUs with 70+ combined logical cores. A common alternative optimization is to set the number of threads in both pools equal to the number of physical cores rather than logical coresconfig = tf.ConfigProto() config.intra_op_parallelism_threads = 44 config.inter_op_parallelism_threads = 44 tf.session(config=config)
In versions of TensorFlow before 1.2, It is recommended using multi-threaded, queue-based input pipelines for performance. Beginning with TensorFlow 1.4, however, It is recommended using the tf.data module instead.
Yes, in Linux, you can check your CPU usage with top
and press 1 to show the usage per CPU. note: The percentage depends on the Irix/Solaris mode.