YARN-10320.Replace FSDataInputStream#read with readFully in Log Aggregation (#4486)
* YARN-10320.Replace FSDataInputStream#read with readFully in Log Aggregation Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
This commit is contained in:
parent
0af4bb3b42
commit
4abb2ba58c
@ -282,7 +282,8 @@ private Pair<Path, Boolean> initializeWriterInRolling(
|
|||||||
checksumFileInputStream = fc.open(remoteLogCheckSumFile);
|
checksumFileInputStream = fc.open(remoteLogCheckSumFile);
|
||||||
int nameLength = checksumFileInputStream.readInt();
|
int nameLength = checksumFileInputStream.readInt();
|
||||||
byte[] b = new byte[nameLength];
|
byte[] b = new byte[nameLength];
|
||||||
int actualLength = checksumFileInputStream.read(b);
|
checksumFileInputStream.readFully(b);
|
||||||
|
int actualLength = b.length;
|
||||||
if (actualLength == nameLength) {
|
if (actualLength == nameLength) {
|
||||||
String recoveredLogFile = new String(
|
String recoveredLogFile = new String(
|
||||||
b, Charset.forName("UTF-8"));
|
b, Charset.forName("UTF-8"));
|
||||||
@ -765,7 +766,8 @@ public Map<String, Long> parseCheckSumFiles(
|
|||||||
checksumFileInputStream = fc.open(file.getPath());
|
checksumFileInputStream = fc.open(file.getPath());
|
||||||
int nameLength = checksumFileInputStream.readInt();
|
int nameLength = checksumFileInputStream.readInt();
|
||||||
byte[] b = new byte[nameLength];
|
byte[] b = new byte[nameLength];
|
||||||
int actualLength = checksumFileInputStream.read(b);
|
checksumFileInputStream.readFully(b);
|
||||||
|
int actualLength = b.length;
|
||||||
if (actualLength == nameLength) {
|
if (actualLength == nameLength) {
|
||||||
nodeName = new String(b, Charset.forName("UTF-8"));
|
nodeName = new String(b, Charset.forName("UTF-8"));
|
||||||
index = checksumFileInputStream.readLong();
|
index = checksumFileInputStream.readLong();
|
||||||
@ -799,7 +801,8 @@ private Long parseChecksum(FileStatus file) {
|
|||||||
checksumFileInputStream = fileContext.open(file.getPath());
|
checksumFileInputStream = fileContext.open(file.getPath());
|
||||||
int nameLength = checksumFileInputStream.readInt();
|
int nameLength = checksumFileInputStream.readInt();
|
||||||
byte[] b = new byte[nameLength];
|
byte[] b = new byte[nameLength];
|
||||||
int actualLength = checksumFileInputStream.read(b);
|
checksumFileInputStream.readFully(b);
|
||||||
|
int actualLength = b.length;
|
||||||
if (actualLength == nameLength) {
|
if (actualLength == nameLength) {
|
||||||
nodeName = new String(b, StandardCharsets.UTF_8);
|
nodeName = new String(b, StandardCharsets.UTF_8);
|
||||||
index = checksumFileInputStream.readLong();
|
index = checksumFileInputStream.readLong();
|
||||||
@ -938,7 +941,8 @@ public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end,
|
|||||||
|
|
||||||
// Load UUID and make sure the UUID is correct.
|
// Load UUID and make sure the UUID is correct.
|
||||||
byte[] uuidRead = new byte[UUID_LENGTH];
|
byte[] uuidRead = new byte[UUID_LENGTH];
|
||||||
int uuidReadLen = fsDataIStream.read(uuidRead);
|
fsDataIStream.readFully(uuidRead);
|
||||||
|
int uuidReadLen = uuidRead.length;
|
||||||
if (this.uuid == null) {
|
if (this.uuid == null) {
|
||||||
this.uuid = createUUID(appId);
|
this.uuid = createUUID(appId);
|
||||||
}
|
}
|
||||||
@ -1322,7 +1326,8 @@ private byte[] loadUUIDFromLogFile(final FileContext fc,
|
|||||||
.endsWith(CHECK_SUM_FILE_SUFFIX)) {
|
.endsWith(CHECK_SUM_FILE_SUFFIX)) {
|
||||||
fsDataInputStream = fc.open(checkPath);
|
fsDataInputStream = fc.open(checkPath);
|
||||||
byte[] b = new byte[uuid.length];
|
byte[] b = new byte[uuid.length];
|
||||||
int actual = fsDataInputStream.read(b);
|
fsDataInputStream.readFully(b);
|
||||||
|
int actual = b.length;
|
||||||
if (actual != uuid.length || Arrays.equals(b, uuid)) {
|
if (actual != uuid.length || Arrays.equals(b, uuid)) {
|
||||||
deleteFileWithRetries(fc, checkPath);
|
deleteFileWithRetries(fc, checkPath);
|
||||||
} else if (id == null){
|
} else if (id == null){
|
||||||
|
Loading…
Reference in New Issue
Block a user