Skip to content

Commit

Permalink
MAPREDUCE-7494. File stream leak when LineRecordReader is interrupted (
Browse files Browse the repository at this point in the history
…#7117)


Contributed by Davin Tjong
  • Loading branch information
davintjong-db authored Oct 16, 2024
1 parent 9321e32 commit 78a08b3
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 37 deletions.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
} else if (fileIn != null) {
fileIn.close();
}
} finally {
if (decompressor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,48 +98,53 @@ public void initialize(InputSplit genericSplit,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIO.awaitFuture(builder.build());

CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
if (start != 0) {
// So we have a split that is only part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}

in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
try {
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
if (start != 0) {
// So we have a split that is only part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}

in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
} catch (Exception e) {
fileIn.close();
throw e;
}
this.pos = start;
}


private int maxBytesToConsume(long pos) {
return isCompressedInput
Expand Down

0 comments on commit 78a08b3

Please sign in to comment.