From 6326605acb5a5bf48d994278c9d3a39733679e81 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Mon, 19 Mar 2012 22:09:14 +0000 Subject: [PATCH] HDFS-3105. Add DatanodeStorage information to block recovery. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1302683 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + ...atanodeProtocolClientSideTranslatorPB.java | 4 +- ...atanodeProtocolServerSideTranslatorPB.java | 4 +- ...atanodeProtocolServerSideTranslatorPB.java | 8 ++-- .../InterDatanodeProtocolTranslatorPB.java | 6 +-- .../hadoop/hdfs/server/datanode/DataNode.java | 45 ++++++++++------- .../hdfs/server/datanode/FSDataset.java | 47 ++++++++++-------- .../server/datanode/FSDatasetInterface.java | 11 +++-- .../hdfs/server/namenode/FSNamesystem.java | 3 +- .../server/namenode/NameNodeRpcServer.java | 7 +-- .../server/protocol/DatanodeProtocol.java | 4 +- .../protocol/InterDatanodeProtocol.java | 9 +--- .../src/main/proto/DatanodeProtocol.proto | 1 + .../main/proto/InterDatanodeProtocol.proto | 2 +- .../server/datanode/SimulatedFSDataset.java | 10 ++-- .../server/datanode/TestBlockRecovery.java | 48 +++++++++++-------- .../datanode/TestInterDatanodeProtocol.java | 9 +--- .../namenode/ha/TestPipelinesFailover.java | 3 +- 18 files changed, 127 insertions(+), 96 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6d35e3545f..42d373a987 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -241,6 +241,8 @@ Release 0.23.3 - UNRELEASED HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo) + HDFS-3105. Add DatanodeStorage information to block recovery. (szetszwo) + OPTIMIZATIONS HDFS-3024. Improve performance of stringification in addStoredBlock (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 2a661c0fc5..c3a856fd77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -293,7 +293,8 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { @Override public void commitBlockSynchronization(ExtendedBlock block, long newgenerationstamp, long newlength, boolean closeFile, - boolean deleteblock, DatanodeID[] newtargets) throws IOException { + boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages + ) throws IOException { CommitBlockSynchronizationRequestProto.Builder builder = CommitBlockSynchronizationRequestProto.newBuilder() .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp) @@ -301,6 +302,7 @@ public void commitBlockSynchronization(ExtendedBlock block, .setDeleteBlock(deleteblock); for (int i = 0; i < newtargets.length; i++) { builder.addNewTaragets(PBHelper.convert(newtargets[i])); + builder.addNewTargetStorages(newtargetstorages[i]); } CommitBlockSynchronizationRequestProto req = builder.build(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index c653daa1ee..0c0a57bcac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -259,10 +259,12 @@ public CommitBlockSynchronizationResponseProto commitBlockSynchronization( for (int i = 0; i < dnprotos.size(); i++) { dns[i] = PBHelper.convert(dnprotos.get(i)); } + final List sidprotos = request.getNewTargetStoragesList(); + final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]); try { impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()), request.getNewGenStamp(), request.getNewLength(), - request.getCloseFile(), request.getDeleteBlock(), dns); + request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java index 5cf8c3a2b1..5c475c8502 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java @@ -20,7 +20,6 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto; @@ -66,14 +65,15 @@ public InitReplicaRecoveryResponseProto initReplicaRecovery( public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery( RpcController unused, UpdateReplicaUnderRecoveryRequestProto request) throws ServiceException { - ExtendedBlock b; + final String storageID; try { - b = impl.updateReplicaUnderRecovery(PBHelper.convert(request.getBlock()), + storageID = impl.updateReplicaUnderRecovery( + PBHelper.convert(request.getBlock()), request.getRecoveryId(), request.getNewLength()); } catch (IOException e) { throw new ServiceException(e); } return UpdateReplicaUnderRecoveryResponseProto.newBuilder() - .setBlock(PBHelper.convert(b)).build(); + .setStorageID(storageID).build(); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java index 7c2019edf2..9d301916dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -91,15 +91,15 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) } @Override - public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, + public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newLength) throws IOException { UpdateReplicaUnderRecoveryRequestProto req = UpdateReplicaUnderRecoveryRequestProto.newBuilder() .setBlock(PBHelper.convert(oldBlock)) .setNewLength(newLength).setRecoveryId(recoveryId).build(); try { - return PBHelper.convert(rpcProxy.updateReplicaUnderRecovery( - NULL_CONTROLLER, req).getBlock()); + return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req + ).getStorageID(); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b7045337d3..9de306e178 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1765,10 +1765,9 @@ private static ReplicaRecoveryInfo callInitReplicaRecovery( * Update replica with the new generation stamp and length. */ @Override // InterDatanodeProtocol - public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, - long recoveryId, - long newLength) throws IOException { - ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock, + public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock, + final long recoveryId, final long newLength) throws IOException { + final String storageID = data.updateReplicaUnderRecovery(oldBlock, recoveryId, newLength); // Notify the namenode of the updated block info. This is important // for HA, since otherwise the standby node may lose track of the @@ -1777,7 +1776,7 @@ public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, newBlock.setGenerationStamp(recoveryId); newBlock.setNumBytes(newLength); notifyNamenodeReceivedBlock(newBlock, ""); - return new ExtendedBlock(oldBlock.getBlockPoolId(), r); + return storageID; } /** A convenient class used in block recovery */ @@ -1786,6 +1785,8 @@ static class BlockRecord { final InterDatanodeProtocol datanode; final ReplicaRecoveryInfo rInfo; + private String storageID; + BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, ReplicaRecoveryInfo rInfo) { @@ -1794,6 +1795,12 @@ static class BlockRecord { this.rInfo = rInfo; } + void updateReplicaUnderRecovery(String bpid, long recoveryId, long newLength + ) throws IOException { + final ExtendedBlock b = new ExtendedBlock(bpid, rInfo); + storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newLength); + } + @Override public String toString() { return "block:" + rInfo + " node:" + id; @@ -1870,6 +1877,7 @@ public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid void syncBlock(RecoveringBlock rBlock, List syncList) throws IOException { ExtendedBlock block = rBlock.getBlock(); + final String bpid = block.getBlockPoolId(); DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(block.getBlockPoolId()); if (nn == null) { @@ -1889,7 +1897,7 @@ void syncBlock(RecoveringBlock rBlock, // The block can be deleted. if (syncList.isEmpty()) { nn.commitBlockSynchronization(block, recoveryId, 0, - true, true, DatanodeID.EMPTY_ARRAY); + true, true, DatanodeID.EMPTY_ARRAY, null); return; } @@ -1912,8 +1920,8 @@ void syncBlock(RecoveringBlock rBlock, // Calculate list of nodes that will participate in the recovery // and the new block size List participatingList = new ArrayList(); - final ExtendedBlock newBlock = new ExtendedBlock(block.getBlockPoolId(), block - .getBlockId(), -1, recoveryId); + final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(), + -1, recoveryId); switch(bestState) { case FINALIZED: assert finalizedLength > 0 : "finalizedLength is not positive"; @@ -1944,16 +1952,11 @@ void syncBlock(RecoveringBlock rBlock, } List failedList = new ArrayList(); - List successList = new ArrayList(); + final List successList = new ArrayList(); for(BlockRecord r : participatingList) { try { - ExtendedBlock reply = r.datanode.updateReplicaUnderRecovery( - new ExtendedBlock(newBlock.getBlockPoolId(), r.rInfo), recoveryId, - newBlock.getNumBytes()); - assert reply.equals(newBlock) && - reply.getNumBytes() == newBlock.getNumBytes() : - "Updated replica must be the same as the new block."; - successList.add(r.id); + r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes()); + successList.add(r); } catch (IOException e) { InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" + newBlock + ", datanode=" + r.id + ")", e); @@ -1974,10 +1977,16 @@ void syncBlock(RecoveringBlock rBlock, } // Notify the name-node about successfully recovered replicas. - DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]); + final DatanodeID[] datanodes = new DatanodeID[successList.size()]; + final String[] storages = new String[datanodes.length]; + for(int i = 0; i < datanodes.length; i++) { + final BlockRecord r = successList.get(i); + datanodes[i] = r.id; + storages[i] = r.storageID; + } nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false, - nlist); + datanodes, storages); } private static void logRecoverBlock(String who, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index 34ce8f3e28..10d8cbbc7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -553,14 +553,16 @@ public void shutdown() { */ static class FSVolume implements FsVolumeSpi { private final FSDataset dataset; + private final String storageID; private final Map map = new HashMap(); private final File currentDir; // /current private final DF usage; private final long reserved; - FSVolume(FSDataset dataset, File currentDir, Configuration conf - ) throws IOException { + FSVolume(FSDataset dataset, String storageID, File currentDir, + Configuration conf) throws IOException { this.dataset = dataset; + this.storageID = storageID; this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); this.currentDir = currentDir; @@ -808,6 +810,10 @@ private void deleteBPDirectories(String bpid, boolean force) } } } + + String getStorageID() { + return storageID; + } } static class FSVolumeSet { @@ -1017,6 +1023,12 @@ public List getVolumes() { return volumes.volumes; } + @Override + public synchronized FSVolume getVolume(final ExtendedBlock b) { + final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); + return r != null? (FSVolume)r.getVolume(): null; + } + @Override // FSDatasetInterface public synchronized Block getStoredBlock(String bpid, long blkid) throws IOException { @@ -1107,7 +1119,7 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf storage.getNumStorageDirs()); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { final File dir = storage.getStorageDir(idx).getCurrentDir(); - volArray.add(new FSVolume(this, dir, conf)); + volArray.add(new FSVolume(this, storage.getStorageID(), dir, conf)); DataNode.LOG.info("FSDataset added volume - " + dir); } volumeMap = new ReplicasMap(this); @@ -1758,19 +1770,6 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea channel.position(newPos); } - synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws IOException { - if ( vol == null ) { - ReplicaInfo replica = volumeMap.get(bpid, blk); - if (replica != null) { - vol = (FSVolume)volumeMap.get(bpid, blk).getVolume(); - } - if ( vol == null ) { - throw new IOException("Could not find volume for block " + blk); - } - } - return vol.createTmpFile(bpid, blk); - } - // // REMIND - mjc - eventually we should have a timeout system // in place to clean up block files left by abandoned clients. @@ -2421,13 +2420,13 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, } @Override // FSDatasetInterface - public synchronized ReplicaInfo updateReplicaUnderRecovery( + public synchronized String updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, final long newlength) throws IOException { //get replica - final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockPoolId(), - oldBlock.getBlockId()); + final String bpid = oldBlock.getBlockPoolId(); + final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); DataNode.LOG.info("updateReplica: block=" + oldBlock + ", recoveryId=" + recoveryId + ", length=" + newlength @@ -2457,10 +2456,18 @@ public synchronized ReplicaInfo updateReplicaUnderRecovery( //update replica final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength); + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == recoveryId + && finalized.getNumBytes() == newlength + : "Replica information mismatched: oldBlock=" + oldBlock + + ", recoveryId=" + recoveryId + ", newlength=" + newlength + + ", finalized=" + finalized; //check replica files after update checkReplicaFiles(finalized); - return finalized; + + //return storage ID + return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID(); } private FinalizedReplica updateReplicaUnderRecovery( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java index 910a1af8ce..d28e359607 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java @@ -87,6 +87,9 @@ public boolean isSimulated() { /** @return a list of volumes. */ public List getVolumes(); + /** @return the volume that contains a replica of the block. */ + public V getVolume(ExtendedBlock b); + /** @return a volume information map (name => info). */ public Map getVolumeInfoMap(); @@ -336,11 +339,11 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) /** * Update replica's generation stamp and length and finalize it. + * @return the ID of storage that stores the block */ - public ReplicaInfo updateReplicaUnderRecovery( - ExtendedBlock oldBlock, - long recoveryId, - long newLength) throws IOException; + public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, + long recoveryId, long newLength) throws IOException; + /** * add new block pool ID * @param bpid Block pool Id diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 92fd4e9721..193d9e8684 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2830,7 +2830,8 @@ private void finalizeINodeFileUnderConstruction(String src, void commitBlockSynchronization(ExtendedBlock lastblock, long newgenerationstamp, long newlength, - boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) + boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, + String[] newtargetstorages) throws IOException, UnresolvedLinkException { String src = ""; writeLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 881a80a985..ef9b531969 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -546,10 +546,11 @@ public void updatePipeline(String clientName, ExtendedBlock oldBlock, @Override // DatanodeProtocol public void commitBlockSynchronization(ExtendedBlock block, long newgenerationstamp, long newlength, - boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) + boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, + String[] newtargetstorages) throws IOException { - namesystem.commitBlockSynchronization(block, - newgenerationstamp, newlength, closeFile, deleteblock, newtargets); + namesystem.commitBlockSynchronization(block, newgenerationstamp, + newlength, closeFile, deleteblock, newtargets, newtargetstorages); } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 5669497ed6..4dd29ce59a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -176,6 +176,6 @@ public void errorReport(DatanodeRegistration registration, */ public void commitBlockSynchronization(ExtendedBlock block, long newgenerationstamp, long newlength, - boolean closeFile, boolean deleteblock, DatanodeID[] newtargets - ) throws IOException; + boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, + String[] newtargetstorages) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java index 50269f36ec..65847fdb5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo; /** An inter-datanode protocol for updating generation stamp @@ -55,9 +54,6 @@ public interface InterDatanodeProtocol { * * For more details on protocol buffer wire protocol, please see * .../org/apache/hadoop/hdfs/protocolPB/overview.html - * - * The log of historical changes can be retrieved from the svn). - * 6: Add block pool ID to Block */ public static final long versionID = 6L; @@ -73,7 +69,6 @@ ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) /** * Update replica with the new generation stamp and length. */ - ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, - long recoveryId, - long newLength) throws IOException; + String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, + long newLength) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 2a96544624..1a7b55123f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -339,6 +339,7 @@ message CommitBlockSynchronizationRequestProto { required bool closeFile = 4; required bool deleteBlock = 5; repeated DatanodeIDProto newTaragets = 6; + repeated string newTargetStorages = 7; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto index 7621c855d6..99c98cc191 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto @@ -55,7 +55,7 @@ message UpdateReplicaUnderRecoveryRequestProto { * Response returns updated block information */ message UpdateReplicaUnderRecoveryResponseProto { - required ExtendedBlockProto block = 1; // Updated block information + required string storageID = 1; // ID of the storage that stores replica } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index a37aefdefe..356b6b10ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -903,11 +903,10 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) } @Override // FSDatasetInterface - public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock, + public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newlength) { - return new FinalizedReplica( - oldBlock.getBlockId(), newlength, recoveryId, null, null); + return storageId; } @Override // FSDatasetInterface @@ -985,4 +984,9 @@ public Map getVolumeInfoMap() { public RollingLogs createRollingLogs(String bpid, String prefix) { throw new UnsupportedOperationException(); } + + @Override + public FsVolumeSpi getVolume(ExtendedBlock b) { + throw new UnsupportedOperationException(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index e197bb38b3..843c4dbb6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -18,6 +18,27 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -34,10 +55,10 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -45,10 +66,9 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; -import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Daemon; @@ -62,16 +82,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.*; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - /** * This tests if sync all replicas in block recovery works correctly */ @@ -196,11 +206,9 @@ private void testSyncReplicas(ReplicaRecoveryInfo replica1, syncList.add(record2); when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), - anyLong())).thenReturn(new ExtendedBlock(block.getBlockPoolId(), - block.getBlockId(), expectLen, block.getGenerationStamp())); + anyLong())).thenReturn("storage1"); when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), - anyLong())).thenReturn(new ExtendedBlock(block.getBlockPoolId(), - block.getBlockId(), expectLen, block.getGenerationStamp())); + anyLong())).thenReturn("storage2"); dn.syncBlock(rBlock, syncList); } @@ -463,7 +471,7 @@ public void testZeroLenReplicas() throws IOException, InterruptedException { d.join(); DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID); verify(dnP).commitBlockSynchronization( - block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY); + block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null); } private List initBlockRecords(DataNode spyDN) throws IOException { @@ -521,7 +529,7 @@ public void testNoReplicaUnderRecovery() throws IOException { DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), - anyBoolean(), any(DatanodeID[].class)); + anyBoolean(), any(DatanodeID[].class), any(String[].class)); } /** @@ -550,7 +558,7 @@ public void testNotMatchedReplicaID() throws IOException { DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), - anyBoolean(), any(DatanodeID[].class)); + anyBoolean(), any(DatanodeID[].class), any(String[].class)); } finally { streams.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java index 26fc102b4e..cecf2eddbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java @@ -329,14 +329,9 @@ public void testUpdateReplicaUnderRecovery() throws IOException { } //update - final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery( + final String storageID = fsdataset.updateReplicaUnderRecovery( new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength); - - //check meta data after update - FSDataset.checkReplicaFiles(finalized); - Assert.assertEquals(b.getBlockId(), finalized.getBlockId()); - Assert.assertEquals(recoveryid, finalized.getGenerationStamp()); - Assert.assertEquals(newlength, finalized.getNumBytes()); + assertTrue(storageID != null); } finally { if (cluster != null) cluster.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java index 547ba72e49..c9bae53a28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -307,7 +307,8 @@ public void testFailoverRightBeforeCommitSynchronization() throws Exception { Mockito.anyLong(), // new length Mockito.eq(true), // close file Mockito.eq(false), // delete block - (DatanodeID[]) Mockito.anyObject()); // new targets + (DatanodeID[]) Mockito.anyObject(), // new targets + (String[]) Mockito.anyObject()); // new target storages DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf); assertFalse(fsOtherUser.recoverLease(TEST_PATH));