According to the Hadoop - The Definitive Guide
The logical records that FileInputFormats define do not usually fit neatly into HDFS blocks. For example, a TextInputFormat’s logical records are lines, which will cross HDFS boundaries more often than not. This has no bearing on the functioning of your program—lines are not missed or broken, for example—but it’s worth knowing about, as it does mean that data-local maps (that is, maps that are running on the same host as their input data) will perform some remote reads. The slight overhead this causes is not normally significant.
Suppose a record line is split across two blocks (b1 and b2). The mapper processing the first block (b1) will notice that the last line doesn't have a EOL separator and fetches the remaining of the line from the next block of data (b2).
How does the mapper processing the second block (b2) determine that the first record is incomplete and should process starting from the second record in the block (b2)?
Interesting question, I spent some time looking at the code for the details and here are my thoughts. The splits are handled by the client by InputFormat.getSplits
, so a look at FileInputFormat gives the following info:
max(minSize, min(maxSize, blockSize))
where maxSize
corresponds to mapred.max.split.size
and minSize
is mapred.min.split.size
.Divide the file into different FileSplit
s based on the split size calculated above. What's important here is that each FileSplit
is initialized with a start
parameter corresponding to the offset in the input file. There is still no handling of the lines at that point. The relevant part of the code looks like this:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
After that, if you look at the LineRecordReader
which is defined by the TextInputFormat
, that's where the lines are handled:
LineRecordReader
it tries to instantiate a LineReader
which is an abstraction to be able to read lines over FSDataInputStream
. There are 2 cases:CompressionCodec
defined, then this codec is responsible for handling boundaries. Probably not relevant to your question.If there is no codec however, that's where things are interesting: if the start
of your InputSplit
is different than 0, then you backtrack 1 character and then skip the first line you encounter identified by \n or \r\n (Windows) ! The backtrack is important because in case your line boundaries are the same as split boundaries, this ensures you do not skip the valid line. Here is the relevant code:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
So since the splits are calculated in the client, the mappers don't need to run in sequence, every mapper already knows if it neds to discard the first line or not.
So basically if you have 2 lines of each 100Mb in the same file, and to simplify let's say the split size is 64Mb. Then when the input splits are calculated, we will have the following scenario: