I am implementing a class that should receive a large text file. I want to split it in chunks and each chunk to be hold by a different thread that will count the frequency of each character in this chunk. I expect with starting more threads to get better performance but it turns out performance is getting poorer. Here`s my code:
public class Main {
public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException, ParseException
{
// save the current run's start time
long startTime = System.currentTimeMillis();
// create options
Options options = new Options();
options.addOption("t", true, "number of threads to be start");
// variables to hold options
int numberOfThreads = 1;
// parse options
CommandLineParser parser = new DefaultParser();
CommandLine cmd;
cmd = parser.parse(options, args);
String threadsNumber = cmd.getOptionValue("t");
numberOfThreads = Integer.parseInt(threadsNumber);
// read file
RandomAccessFile raf = new RandomAccessFile(args[0], "r");
MappedByteBuffer mbb
= raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, raf.length());
ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
Set<Future<int[]>> set = new HashSet<Future<int[]>>();
long chunkSize = raf.length() / numberOfThreads;
byte[] buffer = new byte[(int) chunkSize];
while(mbb.hasRemaining())
{
int remaining = buffer.length;
if(mbb.remaining() < remaining)
{
remaining = mbb.remaining();
}
mbb.get(buffer, 0, remaining);
String content = new String(buffer, "ISO-8859-1");
@SuppressWarnings("unchecked")
Callable<int[]> callable = new FrequenciesCounter(content);
Future<int[]> future = pool.submit(callable);
set.add(future);
}
raf.close();
// let`s assume we will use extended ASCII characters only
int alphabet = 256;
// hold how many times each character is contained in the input file
int[] frequencies = new int[alphabet];
// sum the frequencies from each thread
for(Future<int[]> future: set)
{
for(int i = 0; i < alphabet; i++)
{
frequencies[i] += future.get()[i];
}
}
}
}
//help class for multithreaded frequencies` counting
class FrequenciesCounter implements Callable
{
private int[] frequencies = new int[256];
private char[] content;
public FrequenciesCounter(String input)
{
content = input.toCharArray();
}
public int[] call()
{
System.out.println("Thread " + Thread.currentThread().getName() + "start");
for(int i = 0; i < content.length; i++)
{
frequencies[(int)content[i]]++;
}
System.out.println("Thread " + Thread.currentThread().getName() + "finished");
return frequencies;
}
}
As suggested in comments, you will (usually) do not get better performance when reading from multiple threads. Rather you should process the chunks you have read on multiple threads. Usually processing does some blocking, I/O operations (saving to another file? saving to database? HTTP call?) and your performance will get better if you process on multiple threads.
For processing, you may have ExecutorService (with sensible number of threads). use java.util.concurrent.Executors
to obtain instance of java.util.concurrent.ExecutorService
Having ExecutorService
instance, you may submit your chunks for processing. Submitting chunks would not block. ExecutorService
will start to process each chunk at separate thread (details depends of configuration of ExecutorService
). You may submit instances of Runnable
or Callable
.
Finally, after you submit all items you should call awaitTermination at your ExecutorService. It will wait until processing of all submited items is finished. After awaitTermination returns you should call shutdownNow() to abort processing (otherwise it may hang indefinitely, procesing some rogue task).