From 78a08b3b782a575346338d903eebe28665dc973b Mon Sep 17 00:00:00 2001 From: Davin Tjong <107501978+davintjong-db@users.noreply.github.com> Date: Wed, 16 Oct 2024 03:41:18 -0700 Subject: [PATCH] MAPREDUCE-7494. File stream leak when LineRecordReader is interrupted (#7117) Contributed by Davin Tjong --- .../hadoop/mapred/.LineRecordReader.java.swp | Bin 0 -> 20480 bytes .../hadoop/mapred/LineRecordReader.java | 2 + .../mapreduce/lib/input/LineRecordReader.java | 79 ++++++++++-------- 3 files changed, 44 insertions(+), 37 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/.LineRecordReader.java.swp diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/.LineRecordReader.java.swp b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/.LineRecordReader.java.swp new file mode 100644 index 0000000000000000000000000000000000000000..86f019b54d4711f002597812cc18dc9aabb2942f GIT binary patch literal 20480 zcmeI4e~esJ6~~|QQxHT!6U5+UiDh=#ooy-MS1ESd?v$bXi_UHfQd-~6+}VBI+4six z-gLV{5r4#BqA?MO!5@jnDB_P84PpWYLre@&jHpd4qN0D2V9*5p!)Wkx?tO3O&HgH_ z@(0O#$#%xcTN=*X&{%?#U1wo^$I#&`Lgr7Ti{->+PBzYK6feU5i&X z)S905*L*)z3qm~{g|*=Y*Yk%Dxrg21Aejo2WBLufq@4L&0=Wco3FH#UC6G%Xmq0Fo zTmrcSPEG=G>vZQM4B_X#dG;rtZ(8>GYVsUN-ru>b{q*GfQu6u4vi8%G?`I~T*CZ39 z^CYkHk6Z$|1ab-F638WxOCXm(E`eMExdd_vZ;5c{;{17|@j)J>E z37i3rpX)e}fB=kvE5Luwah&JDw?Pc90jGhd-r_j-f=O^T`1#p<2V>xBa1po=Jbsqr z90Uiz2f+KlW#C0F9-aaBg8kqU@NV#@GaTn>@F@5S_!u|8=0E8q$65V#-Q0^S4O1^$5CMfTrV{~|;s&&*P+ z@3>Qirg&keZh8w|tiyg4&xOHaq$(35M{2qidx5X+a%4=aJ=64R%2qr`twW*Wx7u+v z4z;_WJbzH_2>gaO+YVhbs>ZiLGxDhUt5no(F-m&Qv>Oc_R=p4F!4({AF}>HB#`EJI z0&A_rvb}QhEsW3O54GHICDwo_BIQ*&co7v&EOo4;yXz4Ka4 zM_S#rt?BvmC#HE6Ym8#*af+5sDd8`kb7MtS&|$BxBjqZnY0OV-SvA67LAgF7v1t_< zqn6>r0|O2ptvO zjpwq%8lLYqRaNVGcc!UT!)xlvAo650WxJKDJKfmL?mg*Q>FHB#HN7|^WUr^SNg0+x zJz1*eT`P!IyhFoboPE1|b!E2}2jLQnw1P=j9jUF#Z#SD;M1Jy&2D7s|Ou83mb!=Z0 zET5SgDq~I3VccENg??j@scaL2v$C=T)ab-6wbe;1GHZ%mn#x;fHT42zUy;hzYuz}C zEcS(7EFvFNQ`qfInB=yKk27qzHC?5}Lzqj&X>3Y*Wg1KEI+bKKjXjI^tWR1iDcsdJ z=C<~jDzv^q)i>$Q_8skaj9m|?DJ%{y=JgUb$PQ^VA{}EHM&X!XT^)9tZ7pqLphe{v z8)N}ND3)n9+{%5`h>8=&UCXET%h*Kvy<9ywF*#M480TYTyj&R_Iar+>txO%PPL=jd zZE;=?(j((LQG7yKxQ&{Ps}B`>Y@jpq^7fvFRoq9T?bl>mxXog@9k)YWnOJ8?ry{V} z^k#}?gQLvUTy=ffW;sbp|J&$vGo2b2V|#81X6&iE4%c2#Ocz@;+!`}ZXHK{4$q>kl z&G@0$NFnP&U*nL>e8!n$mNI2I^8AADO&4GV!rh{ON8JxJPvs*uDH^_p2FDF z{dOIVRNod+%4$~GY3oL|45X-c-ksH}1Qph8$mu2V$Nh{ znowatj_&L`)gVTYK2b~8Wa+Bu`m=4BZ8kWpL*L^VCL&(&kWAon8u)}i18QGo>gI{* zDYdV(XHRK-sxnel6MNK-iSeBkapHU{tJ3)GYIkLP=b+LaQ-lOuCMsba3d^9tS%RMd zRrd&GyqPwK(@Lh57xi=?)F%S!D^*pc+NZXcs+B4M{D7MFn*=;WN*`)_)K;sRCFQo5 zqh{iirn@M!y0f9CEe_-;TjWG#b%QDj8u6kVYRCvBdFJA{b;I!R;^JZ&8RGtjp(b*S zhDR$qM#ifnLswBqZU-&0!mM12O5Gl`iKCWOFcVXwO(<6jT6}9{yD2x8DfqvUE~mPTW;rsbtM8Jj0#DUGsP^hml&4@<70`qX z3iYfT)35C{@c?lwEU|4;h_RS+aDvTnxf_V^D6rZbCvr zMbF2=7K};CK$NgR={l48R_DA(rm)&E;Scktb~-*Wg|LkPWPBnhqeC&KfeFM#~XC6G%Xmq0Fo zTmrcSatY)T$R&_VAeTTcfm{N)1paRlkQARy^QFnY^i%aDu{s$Wb5X+|bK5F+pv6#g z&+cdMZVOzGc5-3lXyo_*Zdl$$E?R#7r}g_k<#+zK!2O^N_JiBNKCl^F1TFyQftRTP zI1Y}1UxP=$XTYbxePBP>3N8o7s1bM;{0ck-?gO`iE5W7U9B?}LJM{s71uubbf=_}n zI1fBcoxpd%ey{<&Kz+d1!5!d2@LTEwegZxZe6R_;NNvE+zyshY@PO0~oC#i_KHvx7 zYv9x16W|_j2sD5O6X1OC4=zYv0pACo1tl;92EjYQtJDiT3!VZo*bk;a6leFg6iu6x&DM@0d z_ASHtpC)R}AR==pX-j#88drNVeTGI%((*2+Q`itK`88@Ee2NlC!3AEOtg)fO_1r2Y z5e&4O$nFf4=1y&ugzBMYlB?u1N#_>U%N$#bo zVF(UMRxphF)vziwnt>bl7s&2*UMFHWs&rG5=*%NeLVY7dt(ZhehK}00RkFjqdH$?$ z%p6MA69(M+M(Zr9lgr;lj#DlbNxz#RO0G81iM)#>TkUjm?LA^`xG*i3@I0wt$oG=Q zYXpV9>b%!#p)N_3%JpxA@>HL?Qu%sOs!RA+ANRnb9P_#${}@VJZDv_04-Z=saS$j` zR#Ay@xU*a?r^cL4-jQ4!wqZ<0TLVj{ysZ7WHjfQ>TFu}$5nh5akB zV!OI@c9p)se`EYo(I7eXg2^Kf_OfHXZm0GqnhMC>L~SzcQ08_wHFdUl#HnXVvN8)XlvcvxDZP9-_;BFS&VfL+YuoZQB*V&gX&n^bq-Ei3fN zG?Y5`;5MIMYLKxv=}jsI5-Ufo#bvu*4yv zN7|oS1z57I6lE{*x@UXzSVC$i>8G-^@P^tAyWVhxhK-#VMa1xBjAKx39#rzpmZ@~~ ztISK&1!J=VCCOplbf++atw^oQlFeL@N3jVPb~{BKN&S|oc93LDJupJ;P-2<#h)e1+ zp>DFf!nURMG_+N1PB%iPZ}xT9Uzikznw}~TU8lBBjG6hxs>O$RF)1R)62~k!U+UeS zWU|PqwCH4NicBxVnpyZAo9+~gFU+tr`ZpfgzP(><*kD<=?N{}-mpc|JE9_Y!t64sH zW6$6b5-%Gz7(Qh>O87cT>u3bmZ01cBB4tMz*`$i2n^kYxZ#DOVC>Ax|y4jS>RE@-z zRaH9-siv`RQ;Js1<`Hi$wryTpNin`+^K!|FV|=tJw@MwAE0um(@p-VI$WmdAcc?7fQ)y_5+&@GxQQ;>2S22Vzbm9Q$u3t8VuOPEILnQ z5%^ccOoP9{Q;=YlOy7*2(344}pOn4v?{r6sfAFHT z{zcbKVFMu_?nOdb32C#V(}T|hoL#2U1)JAQuq=LD^#JQEQK=hx6z?7>bSz+n{ZNl! zB2U)bzbx|TIM?^DG&>`>#6PUET$H14e%!I*`~oHl6c(M`crUv`3jQAJ8D zJ&sqI(nJs)86>Q+!;G7;+DQryjj`0Q-dLl~Ha|Po96OpRHteC1ZBUlhZoOnEb$H0? z8KbeKm5N8I7>Xix6CzERdcrNz;K(1-ifCxHrH0lP&FduGU)exlZlX+O0xDaeX-dAO Nc>7Q=vs%^me*uW~GlT#D literal 0 HcmV?d00001 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index ab63c199f2..08c7025203 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -302,6 +302,8 @@ public synchronized void close() throws IOException { try { if (in != null) { in.close(); + } else if (fileIn != null) { + fileIn.close(); } } finally { if (decompressor != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 089208841f..2177d812bc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -98,48 +98,53 @@ public void initialize(InputSplit genericSplit, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); fileIn = FutureIO.awaitFuture(builder.build()); - - CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); - if (null!=codec) { - isCompressedInput = true; - decompressor = CodecPool.getDecompressor(codec); - if (codec instanceof SplittableCompressionCodec) { - final SplitCompressionInputStream cIn = - ((SplittableCompressionCodec)codec).createInputStream( - fileIn, decompressor, start, end, - SplittableCompressionCodec.READ_MODE.BYBLOCK); - in = new CompressedSplitLineReader(cIn, job, - this.recordDelimiterBytes); - start = cIn.getAdjustedStart(); - end = cIn.getAdjustedEnd(); - filePosition = cIn; - } else { - if (start != 0) { - // So we have a split that is only part of a file stored using - // a Compression codec that cannot be split. - throw new IOException("Cannot seek in " + - codec.getClass().getSimpleName() + " compressed stream"); - } - in = new SplitLineReader(codec.createInputStream(fileIn, - decompressor), job, this.recordDelimiterBytes); + try { + CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); + if (null!=codec) { + isCompressedInput = true; + decompressor = CodecPool.getDecompressor(codec); + if (codec instanceof SplittableCompressionCodec) { + final SplitCompressionInputStream cIn = + ((SplittableCompressionCodec)codec).createInputStream( + fileIn, decompressor, start, end, + SplittableCompressionCodec.READ_MODE.BYBLOCK); + in = new CompressedSplitLineReader(cIn, job, + this.recordDelimiterBytes); + start = cIn.getAdjustedStart(); + end = cIn.getAdjustedEnd(); + filePosition = cIn; + } else { + if (start != 0) { + // So we have a split that is only part of a file stored using + // a Compression codec that cannot be split. + throw new IOException("Cannot seek in " + + codec.getClass().getSimpleName() + " compressed stream"); + } + + in = new SplitLineReader(codec.createInputStream(fileIn, + decompressor), job, this.recordDelimiterBytes); + filePosition = fileIn; + } + } else { + fileIn.seek(start); + in = new UncompressedSplitLineReader( + fileIn, job, this.recordDelimiterBytes, split.getLength()); filePosition = fileIn; } - } else { - fileIn.seek(start); - in = new UncompressedSplitLineReader( - fileIn, job, this.recordDelimiterBytes, split.getLength()); - filePosition = fileIn; + // If this is not the first split, we always throw away first record + // because we always (except the last split) read one extra line in + // next() method. + if (start != 0) { + start += in.readLine(new Text(), 0, maxBytesToConsume(start)); + } + this.pos = start; + } catch (Exception e) { + fileIn.close(); + throw e; } - // If this is not the first split, we always throw away first record - // because we always (except the last split) read one extra line in - // next() method. - if (start != 0) { - start += in.readLine(new Text(), 0, maxBytesToConsume(start)); - } - this.pos = start; } - + private int maxBytesToConsume(long pos) { return isCompressedInput