HDFS-14497. Write lock held by metasave impact following RPC processing. Contributed by He Xiaoqiao.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
He Xiaoqiao 2019-05-30 13:27:48 -07:00 committed by Wei-Chiu Chuang
parent 6f5a36c13c
commit 33c62f8f4e
3 changed files with 79 additions and 11 deletions

View File

@ -733,7 +733,7 @@ public BlockPlacementPolicy getBlockPlacementPolicy() {
/** Dump meta data to out. */ /** Dump meta data to out. */
public void metaSave(PrintWriter out) { public void metaSave(PrintWriter out) {
assert namesystem.hasWriteLock(); // TODO: block manager read lock and NS write lock assert namesystem.hasReadLock(); // TODO: block manager read lock and NS write lock
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>(); final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(live, dead, false); datanodeManager.fetchDatanodes(live, dead, false);

View File

@ -592,6 +592,12 @@ private void logAuditEvent(boolean succeeded,
private boolean resourceLowSafeMode = false; private boolean resourceLowSafeMode = false;
private String nameNodeHostName = null; private String nameNodeHostName = null;
/**
* HDFS-14497: Concurrency control when many metaSave request to write
* meta to same out stream after switch to read lock.
*/
private Object metaSaveLock = new Object();
/** /**
* Notify that loading of this FSDirectory is complete, and * Notify that loading of this FSDirectory is complete, and
* it is imageLoaded for use * it is imageLoaded for use
@ -1769,9 +1775,10 @@ void metaSave(String filename) throws IOException {
String operationName = "metaSave"; String operationName = "metaSave";
checkSuperuserPrivilege(operationName); checkSuperuserPrivilege(operationName);
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
writeLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
synchronized(metaSaveLock) {
File file = new File(System.getProperty("hadoop.log.dir"), filename); File file = new File(System.getProperty("hadoop.log.dir"), filename);
PrintWriter out = new PrintWriter(new BufferedWriter( PrintWriter out = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(Files.newOutputStream(file.toPath()), new OutputStreamWriter(Files.newOutputStream(file.toPath()),
@ -1779,14 +1786,15 @@ void metaSave(String filename) throws IOException {
metaSave(out); metaSave(out);
out.flush(); out.flush();
out.close(); out.close();
}
} finally { } finally {
writeUnlock(operationName); readUnlock(operationName);
} }
logAuditEvent(true, operationName, null); logAuditEvent(true, operationName, null);
} }
private void metaSave(PrintWriter out) { private void metaSave(PrintWriter out) {
assert hasWriteLock(); assert hasReadLock();
long totalInodes = this.dir.totalInodes(); long totalInodes = this.dir.totalInodes();
long totalBlocks = this.getBlocksTotal(); long totalBlocks = this.getBlocksTotal();
out.println(totalInodes + " files and directories, " + totalBlocks out.println(totalInodes + " files and directories, " + totalBlocks

View File

@ -27,6 +27,7 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@ -215,6 +216,65 @@ public void testMetaSaveOverwrite() throws Exception {
} }
} }
class MetaSaveThread extends Thread {
NamenodeProtocols nnRpc;
String filename;
public MetaSaveThread(NamenodeProtocols nnRpc, String filename) {
this.nnRpc = nnRpc;
this.filename = filename;
}
@Override
public void run() {
try {
nnRpc.metaSave(filename);
} catch (IOException e) {
}
}
}
/**
* Tests that metasave concurrent output file (not append).
*/
@Test
public void testConcurrentMetaSave() throws Exception {
ArrayList<MetaSaveThread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new MetaSaveThread(nnRpc, "metaSaveConcurrent.out.txt"));
}
for (int i = 0; i < 10; i++) {
threads.get(i).start();
}
for (int i = 0; i < 10; i++) {
threads.get(i).join();
}
// Read output file.
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader rdr = null;
try {
fis = new FileInputStream(getLogFile("metaSaveConcurrent.out.txt"));
isr = new InputStreamReader(fis);
rdr = new BufferedReader(isr);
// Validate that file was overwritten (not appended) by checking for
// presence of only one "Live Datanodes" line.
boolean foundLiveDatanodesLine = false;
String line = rdr.readLine();
while (line != null) {
if (line.startsWith("Live Datanodes")) {
if (foundLiveDatanodesLine) {
fail("multiple Live Datanodes lines, output file not overwritten");
}
foundLiveDatanodesLine = true;
}
line = rdr.readLine();
}
} finally {
IOUtils.cleanup(null, rdr, isr, fis);
}
}
@After @After
public void tearDown() throws IOException { public void tearDown() throws IOException {
if (fileSys != null) if (fileSys != null)