diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index a0f6fba9ba..f76237fbfa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -403,7 +403,7 @@ public StateStoreOperationResult putAll( if (metrics != null) { metrics.addFailure(monotonicNow() - start); } - return new StateStoreOperationResult(primaryKey); + return new StateStoreOperationResult(getOriginalPrimaryKey(primaryKey)); } else { LOG.debug("Not updating {}", record); } @@ -484,13 +484,13 @@ private Void writeRecordToFile(AtomicBoolean success, } catch (IOException e) { LOG.error("Cannot write {}", recordPathTemp, e); recordWrittenSuccessfully = false; - failedRecordsList.add(primaryKey); + failedRecordsList.add(getOriginalPrimaryKey(primaryKey)); success.set(false); } // Commit if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) { LOG.error("Failed committing record into {}", recordPath); - failedRecordsList.add(primaryKey); + failedRecordsList.add(getOriginalPrimaryKey(primaryKey)); success.set(false); } return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java index bbeee8e40f..fcb827ee6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java @@ -187,7 +187,7 @@ public StateStoreOperationResult putAll( record.setDateModified(this.getTime()); if (!updateRecord(tableName, primaryKey, data)) { LOG.error("Cannot write {} into table {}", primaryKey, tableName); - failedRecordsKeys.add(primaryKey); + failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey)); success = false; } } else { @@ -197,7 +197,7 @@ public StateStoreOperationResult putAll( if (metrics != null) { metrics.addFailure(Time.monotonicNow() - start); } - return new StateStoreOperationResult(primaryKey); + return new StateStoreOperationResult(getOriginalPrimaryKey(primaryKey)); } else { LOG.debug("Not updating {} as updates are not allowed", record); } @@ -205,7 +205,7 @@ public StateStoreOperationResult putAll( } else { if (!insertRecord(tableName, primaryKey, data)) { LOG.error("Cannot write {} in table {}", primaryKey, tableName); - failedRecordsKeys.add(primaryKey); + failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey)); success = false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java index 6e34fb04d4..cefd92ad21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Objects; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -105,4 +106,20 @@ protected static String getPrimaryKey(BaseRecord record) { primaryKey = primaryKey.replaceAll(":", COLON_MARK); return primaryKey; } + + /** + * Get the original primary key for the given state store record key. The returned + * key is readable as it is the original key. + * + * @param stateStoreRecordKey The record primary key stored by the state store implementations. + * @return The original primary key for the given record key. + */ + protected static String getOriginalPrimaryKey(String stateStoreRecordKey) { + Objects.requireNonNull(stateStoreRecordKey, + "state store record key provided to getOriginalPrimaryKey should not be null"); + stateStoreRecordKey = stateStoreRecordKey.replaceAll(SLASH_MARK, "/"); + stateStoreRecordKey = stateStoreRecordKey.replaceAll(COLON_MARK, ":"); + return stateStoreRecordKey; + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index 18d3e1a11d..19a23cb022 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -255,7 +255,7 @@ public StateStoreOperationResult putAll( String recordZNode = getNodePath(znode, primaryKey); byte[] data = serialize(record); if (!writeNode(recordZNode, data, update, error)) { - failedRecordsKeys.add(primaryKey); + failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey)); status.set(false); } return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java index b2a608ce93..a46410a274 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage; import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; @@ -138,6 +139,7 @@ public AddMountTableEntriesResponse addMountTableEntries(AddMountTableEntriesReq if (mountTables == null || mountTables.size() == 0) { AddMountTableEntriesResponse response = AddMountTableEntriesResponse.newInstance(); response.setStatus(false); + response.setFailedRecordsKeys(Collections.emptyList()); return response; } for (MountTable mountTable : mountTables) { @@ -145,9 +147,11 @@ public AddMountTableEntriesResponse addMountTableEntries(AddMountTableEntriesReq final String src = mountTable.getSourcePath(); checkMountTablePermission(src); } - boolean status = getDriver().putAll(mountTables, false, true).isOperationSuccessful(); + StateStoreOperationResult result = getDriver().putAll(mountTables, false, true); + boolean status = result.isOperationSuccessful(); AddMountTableEntriesResponse response = AddMountTableEntriesResponse.newInstance(); response.setStatus(status); + response.setFailedRecordsKeys(result.getFailedRecordsKeys()); if (status) { updateCacheAllRouters(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntriesResponse.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntriesResponse.java index b010c79932..3b42308948 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntriesResponse.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntriesResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.store.protocol; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -36,7 +37,16 @@ public static AddMountTableEntriesResponse newInstance() throws IOException { @Unstable public abstract boolean getStatus(); + @Public + @Unstable + public abstract List getFailedRecordsKeys(); + @Public @Unstable public abstract void setStatus(boolean result); + + @Public + @Unstable + public abstract void setFailedRecordsKeys(List failedRecordsKeys); + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntriesResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntriesResponsePBImpl.java index 41bea2976b..07a7989565 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntriesResponsePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntriesResponsePBImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; import java.io.IOException; +import java.util.List; import org.apache.hadoop.thirdparty.protobuf.Message; @@ -65,8 +66,18 @@ public boolean getStatus() { return this.translator.getProtoOrBuilder().getStatus(); } + @Override + public List getFailedRecordsKeys() { + return this.translator.getProtoOrBuilder().getFailedEntriesKeysList(); + } + @Override public void setStatus(boolean result) { this.translator.getBuilder().setStatus(result); } + + @Override + public void setFailedRecordsKeys(List failedRecordsKeys) { + this.translator.getBuilder().addAllFailedEntriesKeys(failedRecordsKeys); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index 2722fd1df0..b627ca35c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -515,7 +515,7 @@ private boolean addAllMount(String[] parameters, int i) throws IOException { mountTable.addMountTableEntries(request); boolean added = addResponse.getStatus(); if (!added) { - System.err.println("Cannot add some or all mount points"); + System.err.println("Cannot add mount points: " + addResponse.getFailedRecordsKeys()); } return added; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto index e83f322732..b3be714310 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto @@ -168,6 +168,7 @@ message AddMountTableEntriesRequestProto { message AddMountTableEntriesResponseProto { optional bool status = 1; + repeated string failedEntriesKeys = 2; } message UpdateMountTableEntryRequestProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index ab2496f3ae..a0ccf80578 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -1882,6 +1882,8 @@ private static void validateMountEntry(String mountName, int numDest, String[] d @Test public void testAddMultipleMountPointsFailure() throws Exception { + System.setErr(new PrintStream(err)); + String[] argv = new String[] {"-addAll", "/testAddMultiMountPoints-01", "ns01", ",", "/dest01", ",", "/testAddMultiMountPoints-02", "ns02,ns03", "/dest02", "-order", "HASH_ALL", @@ -1918,6 +1920,9 @@ public void testAddMultipleMountPointsFailure() throws Exception { "-faulttolerant"}; // mount points were already added assertNotEquals(0, ToolRunner.run(admin, argv)); + + assertTrue("The error message should return failed entries", + err.toString().contains("Cannot add mount points: [/testAddMultiMountPoints-01")); } private void addMountTable(String src, String nsId, String dst) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 8b734305a2..0dc0df1018 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -314,7 +314,7 @@ public void testPut( StateStoreOperationResult result2 = driver.putAll(insertList.subList(0, 1), false, true); assertFalse(result2.isOperationSuccessful()); assertEquals(1, result2.getFailedRecordsKeys().size()); - assertEquals(getPrimaryKey(insertList.get(0)), result2.getFailedRecordsKeys().get(0)); + assertEquals(insertList.get(0).getPrimaryKey(), result2.getFailedRecordsKeys().get(0)); records = driver.get(clazz); assertEquals(records.getRecords().size(), 10); @@ -701,11 +701,4 @@ private static T fromString(String data, Class clazz) { return null; } - private static String getPrimaryKey(BaseRecord record) { - String primaryKey = record.getPrimaryKey(); - primaryKey = primaryKey.replaceAll("/", "0SLASH0"); - primaryKey = primaryKey.replaceAll(":", "_"); - return primaryKey; - } - } \ No newline at end of file