HDFS-17020. RBF: mount table addAll should print failed records in std error (#5674)
This commit is contained in:
parent
afe850ca2c
commit
3b65b5d68f
@ -403,7 +403,7 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
if (metrics != null) {
|
||||
metrics.addFailure(monotonicNow() - start);
|
||||
}
|
||||
return new StateStoreOperationResult(primaryKey);
|
||||
return new StateStoreOperationResult(getOriginalPrimaryKey(primaryKey));
|
||||
} else {
|
||||
LOG.debug("Not updating {}", record);
|
||||
}
|
||||
@ -484,13 +484,13 @@ private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot write {}", recordPathTemp, e);
|
||||
recordWrittenSuccessfully = false;
|
||||
failedRecordsList.add(primaryKey);
|
||||
failedRecordsList.add(getOriginalPrimaryKey(primaryKey));
|
||||
success.set(false);
|
||||
}
|
||||
// Commit
|
||||
if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
|
||||
LOG.error("Failed committing record into {}", recordPath);
|
||||
failedRecordsList.add(primaryKey);
|
||||
failedRecordsList.add(getOriginalPrimaryKey(primaryKey));
|
||||
success.set(false);
|
||||
}
|
||||
return null;
|
||||
|
@ -187,7 +187,7 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
record.setDateModified(this.getTime());
|
||||
if (!updateRecord(tableName, primaryKey, data)) {
|
||||
LOG.error("Cannot write {} into table {}", primaryKey, tableName);
|
||||
failedRecordsKeys.add(primaryKey);
|
||||
failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey));
|
||||
success = false;
|
||||
}
|
||||
} else {
|
||||
@ -197,7 +197,7 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
if (metrics != null) {
|
||||
metrics.addFailure(Time.monotonicNow() - start);
|
||||
}
|
||||
return new StateStoreOperationResult(primaryKey);
|
||||
return new StateStoreOperationResult(getOriginalPrimaryKey(primaryKey));
|
||||
} else {
|
||||
LOG.debug("Not updating {} as updates are not allowed", record);
|
||||
}
|
||||
@ -205,7 +205,7 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
} else {
|
||||
if (!insertRecord(tableName, primaryKey, data)) {
|
||||
LOG.error("Cannot write {} in table {}", primaryKey, tableName);
|
||||
failedRecordsKeys.add(primaryKey);
|
||||
failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey));
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
@ -105,4 +106,20 @@ protected static String getPrimaryKey(BaseRecord record) {
|
||||
primaryKey = primaryKey.replaceAll(":", COLON_MARK);
|
||||
return primaryKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the original primary key for the given state store record key. The returned
|
||||
* key is readable as it is the original key.
|
||||
*
|
||||
* @param stateStoreRecordKey The record primary key stored by the state store implementations.
|
||||
* @return The original primary key for the given record key.
|
||||
*/
|
||||
protected static String getOriginalPrimaryKey(String stateStoreRecordKey) {
|
||||
Objects.requireNonNull(stateStoreRecordKey,
|
||||
"state store record key provided to getOriginalPrimaryKey should not be null");
|
||||
stateStoreRecordKey = stateStoreRecordKey.replaceAll(SLASH_MARK, "/");
|
||||
stateStoreRecordKey = stateStoreRecordKey.replaceAll(COLON_MARK, ":");
|
||||
return stateStoreRecordKey;
|
||||
}
|
||||
|
||||
}
|
@ -255,7 +255,7 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
String recordZNode = getNodePath(znode, primaryKey);
|
||||
byte[] data = serialize(record);
|
||||
if (!writeNode(recordZNode, data, update, error)) {
|
||||
failedRecordsKeys.add(primaryKey);
|
||||
failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey));
|
||||
status.set(false);
|
||||
}
|
||||
return null;
|
||||
|
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
@ -138,6 +139,7 @@ public AddMountTableEntriesResponse addMountTableEntries(AddMountTableEntriesReq
|
||||
if (mountTables == null || mountTables.size() == 0) {
|
||||
AddMountTableEntriesResponse response = AddMountTableEntriesResponse.newInstance();
|
||||
response.setStatus(false);
|
||||
response.setFailedRecordsKeys(Collections.emptyList());
|
||||
return response;
|
||||
}
|
||||
for (MountTable mountTable : mountTables) {
|
||||
@ -145,9 +147,11 @@ public AddMountTableEntriesResponse addMountTableEntries(AddMountTableEntriesReq
|
||||
final String src = mountTable.getSourcePath();
|
||||
checkMountTablePermission(src);
|
||||
}
|
||||
boolean status = getDriver().putAll(mountTables, false, true).isOperationSuccessful();
|
||||
StateStoreOperationResult result = getDriver().putAll(mountTables, false, true);
|
||||
boolean status = result.isOperationSuccessful();
|
||||
AddMountTableEntriesResponse response = AddMountTableEntriesResponse.newInstance();
|
||||
response.setStatus(status);
|
||||
response.setFailedRecordsKeys(result.getFailedRecordsKeys());
|
||||
if (status) {
|
||||
updateCacheAllRouters();
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hdfs.server.federation.store.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
@ -36,7 +37,16 @@ public static AddMountTableEntriesResponse newInstance() throws IOException {
|
||||
@Unstable
|
||||
public abstract boolean getStatus();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract List<String> getFailedRecordsKeys();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setStatus(boolean result);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setFailedRecordsKeys(List<String> failedRecordsKeys);
|
||||
|
||||
}
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.thirdparty.protobuf.Message;
|
||||
|
||||
@ -65,8 +66,18 @@ public boolean getStatus() {
|
||||
return this.translator.getProtoOrBuilder().getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getFailedRecordsKeys() {
|
||||
return this.translator.getProtoOrBuilder().getFailedEntriesKeysList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStatus(boolean result) {
|
||||
this.translator.getBuilder().setStatus(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFailedRecordsKeys(List<String> failedRecordsKeys) {
|
||||
this.translator.getBuilder().addAllFailedEntriesKeys(failedRecordsKeys);
|
||||
}
|
||||
}
|
@ -515,7 +515,7 @@ private boolean addAllMount(String[] parameters, int i) throws IOException {
|
||||
mountTable.addMountTableEntries(request);
|
||||
boolean added = addResponse.getStatus();
|
||||
if (!added) {
|
||||
System.err.println("Cannot add some or all mount points");
|
||||
System.err.println("Cannot add mount points: " + addResponse.getFailedRecordsKeys());
|
||||
}
|
||||
return added;
|
||||
}
|
||||
|
@ -168,6 +168,7 @@ message AddMountTableEntriesRequestProto {
|
||||
|
||||
message AddMountTableEntriesResponseProto {
|
||||
optional bool status = 1;
|
||||
repeated string failedEntriesKeys = 2;
|
||||
}
|
||||
|
||||
message UpdateMountTableEntryRequestProto {
|
||||
|
@ -1882,6 +1882,8 @@ private static void validateMountEntry(String mountName, int numDest, String[] d
|
||||
|
||||
@Test
|
||||
public void testAddMultipleMountPointsFailure() throws Exception {
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
String[] argv =
|
||||
new String[] {"-addAll", "/testAddMultiMountPoints-01", "ns01", ",", "/dest01", ",",
|
||||
"/testAddMultiMountPoints-02", "ns02,ns03", "/dest02", "-order", "HASH_ALL",
|
||||
@ -1918,6 +1920,9 @@ public void testAddMultipleMountPointsFailure() throws Exception {
|
||||
"-faulttolerant"};
|
||||
// mount points were already added
|
||||
assertNotEquals(0, ToolRunner.run(admin, argv));
|
||||
|
||||
assertTrue("The error message should return failed entries",
|
||||
err.toString().contains("Cannot add mount points: [/testAddMultiMountPoints-01"));
|
||||
}
|
||||
|
||||
private void addMountTable(String src, String nsId, String dst)
|
||||
|
@ -314,7 +314,7 @@ public <T extends BaseRecord> void testPut(
|
||||
StateStoreOperationResult result2 = driver.putAll(insertList.subList(0, 1), false, true);
|
||||
assertFalse(result2.isOperationSuccessful());
|
||||
assertEquals(1, result2.getFailedRecordsKeys().size());
|
||||
assertEquals(getPrimaryKey(insertList.get(0)), result2.getFailedRecordsKeys().get(0));
|
||||
assertEquals(insertList.get(0).getPrimaryKey(), result2.getFailedRecordsKeys().get(0));
|
||||
|
||||
records = driver.get(clazz);
|
||||
assertEquals(records.getRecords().size(), 10);
|
||||
@ -701,11 +701,4 @@ private static <T> T fromString(String data, Class<T> clazz) {
|
||||
return null;
|
||||
}
|
||||
|
||||
private static String getPrimaryKey(BaseRecord record) {
|
||||
String primaryKey = record.getPrimaryKey();
|
||||
primaryKey = primaryKey.replaceAll("/", "0SLASH0");
|
||||
primaryKey = primaryKey.replaceAll(":", "_");
|
||||
return primaryKey;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user