HDDS-364. Update open container replica information in SCM during DN register. Contributed by Ajay Kumar.

This commit is contained in:
Márton Elek 2018-08-24 22:27:43 +02:00
parent 8563fd67be
commit a5eba25506
7 changed files with 41 additions and 15 deletions

View File

@ -505,15 +505,26 @@ public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
*/ */
@Override @Override
public void processContainerReports(DatanodeDetails datanodeDetails, public void processContainerReports(DatanodeDetails datanodeDetails,
ContainerReportsProto reports) ContainerReportsProto reports, boolean isRegisterCall)
throws IOException { throws IOException {
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos = reports.getReportsList(); containerInfos = reports.getReportsList();
PendingDeleteStatusList pendingDeleteStatusList = PendingDeleteStatusList pendingDeleteStatusList =
new PendingDeleteStatusList(datanodeDetails); new PendingDeleteStatusList(datanodeDetails);
for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo :
containerInfos) { containerInfos) {
byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID()); // Update replica info during registration process.
if (isRegisterCall) {
try {
getStateManager().addContainerReplica(ContainerID.
valueof(contInfo.getContainerID()), datanodeDetails);
} catch (Exception ex) {
// Continue to next one after logging the error.
LOG.error("Error while adding replica for containerId {}.",
contInfo.getContainerID(), ex);
}
}
byte[] dbKey = Longs.toByteArray(contInfo.getContainerID());
lock.lock(); lock.lock();
try { try {
byte[] containerBytes = containerStore.get(dbKey); byte[] containerBytes = containerStore.get(dbKey);
@ -522,12 +533,12 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
HddsProtos.SCMContainerInfo newState = HddsProtos.SCMContainerInfo newState =
reconcileState(datanodeState, knownState, datanodeDetails); reconcileState(contInfo, knownState, datanodeDetails);
if (knownState.getDeleteTransactionId() > datanodeState if (knownState.getDeleteTransactionId() > contInfo
.getDeleteTransactionId()) { .getDeleteTransactionId()) {
pendingDeleteStatusList pendingDeleteStatusList
.addPendingDeleteStatus(datanodeState.getDeleteTransactionId(), .addPendingDeleteStatus(contInfo.getDeleteTransactionId(),
knownState.getDeleteTransactionId(), knownState.getDeleteTransactionId(),
knownState.getContainerID()); knownState.getContainerID());
} }
@ -558,7 +569,7 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
LOG.error("Error while processing container report from datanode :" + LOG.error("Error while processing container report from datanode :" +
" {}, for container: {}, reason: container doesn't exist in" + " {}, for container: {}, reason: container doesn't exist in" +
"container database.", datanodeDetails, "container database.", datanodeDetails,
datanodeState.getContainerID()); contInfo.getContainerID());
} }
} finally { } finally {
lock.unlock(); lock.unlock();

View File

@ -84,7 +84,8 @@ public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
try { try {
//update state in container db and trigger close container events //update state in container db and trigger close container events
containerMapping.processContainerReports(datanodeOrigin, containerReport); containerMapping
.processContainerReports(datanodeOrigin, containerReport, false);
Set<ContainerID> containerIds = containerReport.getReportsList().stream() Set<ContainerID> containerIds = containerReport.getReportsList().stream()
.map(containerProto -> containerProto.getContainerID()) .map(containerProto -> containerProto.getContainerID())

View File

@ -115,7 +115,7 @@ HddsProtos.LifeCycleState updateContainerState(long containerID,
* @param reports Container report * @param reports Container report
*/ */
void processContainerReports(DatanodeDetails datanodeDetails, void processContainerReports(DatanodeDetails datanodeDetails,
ContainerReportsProto reports) ContainerReportsProto reports, boolean isRegisterCall)
throws IOException; throws IOException;
/** /**

View File

@ -196,7 +196,7 @@ public SCMRegisteredResponseProto register(
if (registeredCommand.getError() if (registeredCommand.getError()
== SCMRegisteredResponseProto.ErrorCode.success) { == SCMRegisteredResponseProto.ErrorCode.success) {
scm.getScmContainerManager().processContainerReports(datanodeDetails, scm.getScmContainerManager().processContainerReports(datanodeDetails,
containerReportsProto); containerReportsProto, true);
} }
return getRegisteredResponse(registeredCommand); return getRegisteredResponse(registeredCommand);
} }

View File

@ -242,7 +242,7 @@ public void testContainerCreationLeaseTimeout() throws IOException,
} }
@Test @Test
public void testFullContainerReport() throws IOException { public void testFullContainerReport() throws Exception {
ContainerInfo info = createContainer(); ContainerInfo info = createContainer();
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports = List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
@ -266,13 +266,26 @@ public void testFullContainerReport() throws IOException {
.newBuilder(); .newBuilder();
crBuilder.addAllReports(reports); crBuilder.addAllReports(reports);
mapping.processContainerReports(datanodeDetails, crBuilder.build()); mapping.processContainerReports(datanodeDetails, crBuilder.build(), false);
ContainerInfo updatedContainer = ContainerInfo updatedContainer =
mapping.getContainer(info.getContainerID()); mapping.getContainer(info.getContainerID());
Assert.assertEquals(100000000L, Assert.assertEquals(100000000L,
updatedContainer.getNumberOfKeys()); updatedContainer.getNumberOfKeys());
Assert.assertEquals(2000000000L, updatedContainer.getUsedBytes()); Assert.assertEquals(2000000000L, updatedContainer.getUsedBytes());
for (StorageContainerDatanodeProtocolProtos.ContainerInfo c : reports) {
LambdaTestUtils.intercept(SCMException.class, "No entry "
+ "exist for containerId:", () -> mapping.getStateManager()
.getContainerReplicas(ContainerID.valueof(c.getContainerID())));
}
mapping.processContainerReports(TestUtils.randomDatanodeDetails(),
crBuilder.build(), true);
for (StorageContainerDatanodeProtocolProtos.ContainerInfo c : reports) {
Assert.assertTrue(mapping.getStateManager().getContainerReplicas(
ContainerID.valueof(c.getContainerID())).size() > 0);
}
} }
@Test @Test
@ -301,7 +314,7 @@ public void testContainerCloseWithContainerReport() throws IOException {
ContainerReportsProto.newBuilder(); ContainerReportsProto.newBuilder();
crBuilder.addAllReports(reports); crBuilder.addAllReports(reports);
mapping.processContainerReports(datanodeDetails, crBuilder.build()); mapping.processContainerReports(datanodeDetails, crBuilder.build(), false);
ContainerInfo updatedContainer = ContainerInfo updatedContainer =
mapping.getContainer(info.getContainerID()); mapping.getContainer(info.getContainerID());

View File

@ -223,6 +223,6 @@ private void sendContainerReport(ContainerInfo info, long used) throws
.setDeleteTransactionId(0); .setDeleteTransactionId(0);
reports.addReports(ciBuilder); reports.addReports(ciBuilder);
mapping.processContainerReports(TestUtils.randomDatanodeDetails(), mapping.processContainerReports(TestUtils.randomDatanodeDetails(),
reports.build()); reports.build(), false);
} }
} }

View File

@ -187,7 +187,8 @@ private void verifyBlockDeletionEvent()
logCapturer.clearOutput(); logCapturer.clearOutput();
scm.getScmContainerManager().processContainerReports( scm.getScmContainerManager().processContainerReports(
cluster.getHddsDatanodes().get(0).getDatanodeDetails(), dummyReport); cluster.getHddsDatanodes().get(0).getDatanodeDetails(), dummyReport,
false);
// wait for event to be handled by event handler // wait for event to be handled by event handler
Thread.sleep(1000); Thread.sleep(1000);
String output = logCapturer.getOutput(); String output = logCapturer.getOutput();