HDFS-8909. Erasure coding: update BlockInfoContiguousUC and BlockInfoStripedUC to use BlockUnderConstructionFeature. Contributed by Jing Zhao.
This commit is contained in:
parent
067ec8c2b1
commit
164cbe6439
@ -403,3 +403,6 @@
|
|||||||
|
|
||||||
HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOutputStream
|
HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOutputStream
|
||||||
when the data length is small. (szetszwo via waltersu4549)
|
when the data length is small. (szetszwo via waltersu4549)
|
||||||
|
|
||||||
|
HDFS-8909. Erasure coding: update BlockInfoContiguousUC and BlockInfoStripedUC
|
||||||
|
to use BlockUnderConstructionFeature. (Jing Zhao via waltersu4549)
|
||||||
|
@ -17,8 +17,10 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.util.LightWeightGSet;
|
import org.apache.hadoop.util.LightWeightGSet;
|
||||||
@ -52,6 +54,8 @@ public abstract class BlockInfo extends Block
|
|||||||
*/
|
*/
|
||||||
protected Object[] triplets;
|
protected Object[] triplets;
|
||||||
|
|
||||||
|
private BlockUnderConstructionFeature uc;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct an entry for blocksmap
|
* Construct an entry for blocksmap
|
||||||
* @param size the block's replication factor, or the total number of blocks
|
* @param size the block's replication factor, or the total number of blocks
|
||||||
@ -287,26 +291,6 @@ public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* BlockInfo represents a block that is not being constructed.
|
|
||||||
* In order to start modifying the block, the BlockInfo should be converted to
|
|
||||||
* {@link BlockInfoContiguousUnderConstruction} or
|
|
||||||
* {@link BlockInfoStripedUnderConstruction}.
|
|
||||||
* @return {@link BlockUCState#COMPLETE}
|
|
||||||
*/
|
|
||||||
public BlockUCState getBlockUCState() {
|
|
||||||
return BlockUCState.COMPLETE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Is this block complete?
|
|
||||||
*
|
|
||||||
* @return true if the state of the block is {@link BlockUCState#COMPLETE}
|
|
||||||
*/
|
|
||||||
public boolean isComplete() {
|
|
||||||
return getBlockUCState().equals(BlockUCState.COMPLETE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isDeleted() {
|
public boolean isDeleted() {
|
||||||
return (bc == null);
|
return (bc == null);
|
||||||
}
|
}
|
||||||
@ -332,4 +316,85 @@ public LightWeightGSet.LinkedElement getNext() {
|
|||||||
public void setNext(LightWeightGSet.LinkedElement next) {
|
public void setNext(LightWeightGSet.LinkedElement next) {
|
||||||
this.nextLinkedElement = next;
|
this.nextLinkedElement = next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* UnderConstruction Feature related */
|
||||||
|
|
||||||
|
public BlockUnderConstructionFeature getUnderConstructionFeature() {
|
||||||
|
return uc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BlockUCState getBlockUCState() {
|
||||||
|
return uc == null ? BlockUCState.COMPLETE : uc.getBlockUCState();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this block complete?
|
||||||
|
*
|
||||||
|
* @return true if the state of the block is {@link BlockUCState#COMPLETE}
|
||||||
|
*/
|
||||||
|
public boolean isComplete() {
|
||||||
|
return getBlockUCState().equals(BlockUCState.COMPLETE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add/Update the under construction feature.
|
||||||
|
*/
|
||||||
|
public void convertToBlockUnderConstruction(BlockUCState s,
|
||||||
|
DatanodeStorageInfo[] targets) {
|
||||||
|
if (isComplete()) {
|
||||||
|
uc = new BlockUnderConstructionFeature(this, s, targets, this.isStriped());
|
||||||
|
} else {
|
||||||
|
// the block is already under construction
|
||||||
|
uc.setBlockUCState(s);
|
||||||
|
uc.setExpectedLocations(this, targets, this.isStriped());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert an under construction block to a complete block.
|
||||||
|
*
|
||||||
|
* @return BlockInfo - a complete block.
|
||||||
|
* @throws IOException if the state of the block
|
||||||
|
* (the generation stamp and the length) has not been committed by
|
||||||
|
* the client or it does not have at least a minimal number of replicas
|
||||||
|
* reported from data-nodes.
|
||||||
|
*/
|
||||||
|
BlockInfo convertToCompleteBlock() throws IOException {
|
||||||
|
assert getBlockUCState() != BlockUCState.COMPLETE :
|
||||||
|
"Trying to convert a COMPLETE block";
|
||||||
|
uc = null;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the recorded replicas. When about to commit or finish the
|
||||||
|
* pipeline recovery sort out bad replicas.
|
||||||
|
* @param genStamp The final generation stamp for the block.
|
||||||
|
*/
|
||||||
|
public void setGenerationStampAndVerifyReplicas(long genStamp) {
|
||||||
|
Preconditions.checkState(uc != null && !isComplete());
|
||||||
|
// Set the generation stamp for the block.
|
||||||
|
setGenerationStamp(genStamp);
|
||||||
|
|
||||||
|
// Remove the replicas with wrong gen stamp
|
||||||
|
uc.removeStaleReplicas(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commit block's length and generation stamp as reported by the client.
|
||||||
|
* Set block state to {@link BlockUCState#COMMITTED}.
|
||||||
|
* @param block - contains client reported block length and generation
|
||||||
|
* @throws IOException if block ids are inconsistent.
|
||||||
|
*/
|
||||||
|
void commitBlock(Block block) throws IOException {
|
||||||
|
if (getBlockId() != block.getBlockId()) {
|
||||||
|
throw new IOException("Trying to commit inconsistent block: id = "
|
||||||
|
+ block.getBlockId() + ", expected id = " + getBlockId());
|
||||||
|
}
|
||||||
|
Preconditions.checkState(!isComplete());
|
||||||
|
uc.commit();
|
||||||
|
this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
|
||||||
|
// Sort out invalid replicas.
|
||||||
|
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subclass of {@link BlockInfo}, used for a block with replication scheme.
|
* Subclass of {@link BlockInfo}, used for a block with replication scheme.
|
||||||
@ -123,28 +122,6 @@ void replaceBlock(BlockInfo newBlock) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert a complete block to an under construction block.
|
|
||||||
* @return BlockInfoUnderConstruction - an under construction block.
|
|
||||||
*/
|
|
||||||
public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
|
|
||||||
BlockUCState s, DatanodeStorageInfo[] targets) {
|
|
||||||
if(isComplete()) {
|
|
||||||
BlockInfoContiguousUnderConstruction ucBlock =
|
|
||||||
new BlockInfoContiguousUnderConstruction(this,
|
|
||||||
getBlockCollection().getPreferredBlockReplication(), s, targets);
|
|
||||||
ucBlock.setBlockCollection(getBlockCollection());
|
|
||||||
return ucBlock;
|
|
||||||
}
|
|
||||||
// the block is already under construction
|
|
||||||
BlockInfoContiguousUnderConstruction ucBlock =
|
|
||||||
(BlockInfoContiguousUnderConstruction) this;
|
|
||||||
ucBlock.setBlockUCState(s);
|
|
||||||
ucBlock.setExpectedLocations(targets);
|
|
||||||
ucBlock.setBlockCollection(getBlockCollection());
|
|
||||||
return ucBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final boolean isStriped() {
|
public final boolean isStriped() {
|
||||||
return false;
|
return false;
|
||||||
|
@ -1,281 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Represents a block that is currently being constructed.<br>
|
|
||||||
* This is usually the last block of a file opened for write or append.
|
|
||||||
*/
|
|
||||||
public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous
|
|
||||||
implements BlockInfoUnderConstruction{
|
|
||||||
/** Block state. See {@link BlockUCState} */
|
|
||||||
private BlockUCState blockUCState;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Block replicas as assigned when the block was allocated.
|
|
||||||
* This defines the pipeline order.
|
|
||||||
*/
|
|
||||||
private List<ReplicaUnderConstruction> replicas;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Index of the primary data node doing the recovery. Useful for log
|
|
||||||
* messages.
|
|
||||||
*/
|
|
||||||
private int primaryNodeIndex = -1;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The new generation stamp, which this block will have
|
|
||||||
* after the recovery succeeds. Also used as a recovery id to identify
|
|
||||||
* the right recovery if any of the abandoned recoveries re-appear.
|
|
||||||
*/
|
|
||||||
private long blockRecoveryId = 0;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The block source to use in the event of copy-on-write truncate.
|
|
||||||
*/
|
|
||||||
private Block truncateBlock;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create block and set its state to
|
|
||||||
* {@link BlockUCState#UNDER_CONSTRUCTION}.
|
|
||||||
*/
|
|
||||||
public BlockInfoContiguousUnderConstruction(Block blk, short replication) {
|
|
||||||
this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a block that is currently being constructed.
|
|
||||||
*/
|
|
||||||
public BlockInfoContiguousUnderConstruction(Block blk, short replication,
|
|
||||||
BlockUCState state, DatanodeStorageInfo[] targets) {
|
|
||||||
super(blk, replication);
|
|
||||||
assert getBlockUCState() != BlockUCState.COMPLETE :
|
|
||||||
"BlockInfoContiguousUnderConstruction cannot be in COMPLETE state";
|
|
||||||
this.blockUCState = state;
|
|
||||||
setExpectedLocations(targets);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BlockInfoContiguous convertToCompleteBlock() throws IOException {
|
|
||||||
assert getBlockUCState() != BlockUCState.COMPLETE :
|
|
||||||
"Trying to convert a COMPLETE block";
|
|
||||||
return new BlockInfoContiguous(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
|
|
||||||
int numLocations = targets == null ? 0 : targets.length;
|
|
||||||
this.replicas = new ArrayList<>(numLocations);
|
|
||||||
for(int i = 0; i < numLocations; i++) {
|
|
||||||
replicas.add(new ReplicaUnderConstruction(this, targets[i],
|
|
||||||
ReplicaState.RBW));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DatanodeStorageInfo[] getExpectedStorageLocations() {
|
|
||||||
int numLocations = replicas == null ? 0 : replicas.size();
|
|
||||||
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
|
|
||||||
for (int i = 0; i < numLocations; i++) {
|
|
||||||
storages[i] = replicas.get(i).getExpectedStorageLocation();
|
|
||||||
}
|
|
||||||
return storages;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getNumExpectedLocations() {
|
|
||||||
return replicas == null ? 0 : replicas.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the state of the block under construction.
|
|
||||||
* @see BlockUCState
|
|
||||||
*/
|
|
||||||
@Override // BlockInfo
|
|
||||||
public BlockUCState getBlockUCState() {
|
|
||||||
return blockUCState;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setBlockUCState(BlockUCState s) {
|
|
||||||
blockUCState = s;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getBlockRecoveryId() {
|
|
||||||
return blockRecoveryId;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Block getTruncateBlock() {
|
|
||||||
return truncateBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Block toBlock(){
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTruncateBlock(Block recoveryBlock) {
|
|
||||||
this.truncateBlock = recoveryBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setGenerationStampAndVerifyReplicas(long genStamp) {
|
|
||||||
// Set the generation stamp for the block.
|
|
||||||
setGenerationStamp(genStamp);
|
|
||||||
if (replicas == null)
|
|
||||||
return;
|
|
||||||
|
|
||||||
// Remove the replicas with wrong gen stamp.
|
|
||||||
// The replica list is unchanged.
|
|
||||||
for (ReplicaUnderConstruction r : replicas) {
|
|
||||||
if (genStamp != r.getGenerationStamp()) {
|
|
||||||
r.getExpectedStorageLocation().removeBlock(this);
|
|
||||||
NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica "
|
|
||||||
+ "from location: {}", r.getExpectedStorageLocation());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void commitBlock(Block block) throws IOException {
|
|
||||||
if(getBlockId() != block.getBlockId())
|
|
||||||
throw new IOException("Trying to commit inconsistent block: id = "
|
|
||||||
+ block.getBlockId() + ", expected id = " + getBlockId());
|
|
||||||
blockUCState = BlockUCState.COMMITTED;
|
|
||||||
this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
|
|
||||||
// Sort out invalid replicas.
|
|
||||||
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initializeBlockRecovery(long recoveryId) {
|
|
||||||
setBlockUCState(BlockUCState.UNDER_RECOVERY);
|
|
||||||
blockRecoveryId = recoveryId;
|
|
||||||
if (replicas.size() == 0) {
|
|
||||||
NameNode.blockStateChangeLog.warn("BLOCK*"
|
|
||||||
+ " BlockInfoContiguousUnderConstruction.initLeaseRecovery:"
|
|
||||||
+ " No blocks found, lease removed.");
|
|
||||||
}
|
|
||||||
boolean allLiveReplicasTriedAsPrimary = true;
|
|
||||||
for (ReplicaUnderConstruction replica : replicas) {
|
|
||||||
// Check if all replicas have been tried or not.
|
|
||||||
if (replica.isAlive()) {
|
|
||||||
allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
|
|
||||||
replica.getChosenAsPrimary());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (allLiveReplicasTriedAsPrimary) {
|
|
||||||
// Just set all the replicas to be chosen whether they are alive or not.
|
|
||||||
for (ReplicaUnderConstruction replica : replicas) {
|
|
||||||
replica.setChosenAsPrimary(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
long mostRecentLastUpdate = 0;
|
|
||||||
ReplicaUnderConstruction primary = null;
|
|
||||||
primaryNodeIndex = -1;
|
|
||||||
for(int i = 0; i < replicas.size(); i++) {
|
|
||||||
// Skip alive replicas which have been chosen for recovery.
|
|
||||||
if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
final ReplicaUnderConstruction ruc = replicas.get(i);
|
|
||||||
final long lastUpdate = ruc.getExpectedStorageLocation()
|
|
||||||
.getDatanodeDescriptor().getLastUpdateMonotonic();
|
|
||||||
if (lastUpdate > mostRecentLastUpdate) {
|
|
||||||
primaryNodeIndex = i;
|
|
||||||
primary = ruc;
|
|
||||||
mostRecentLastUpdate = lastUpdate;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (primary != null) {
|
|
||||||
primary.getExpectedStorageLocation().getDatanodeDescriptor()
|
|
||||||
.addBlockToBeRecovered(this);
|
|
||||||
primary.setChosenAsPrimary(true);
|
|
||||||
NameNode.blockStateChangeLog.debug(
|
|
||||||
"BLOCK* {} recovery started, primary={}", this, primary);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
|
|
||||||
Block block, ReplicaState rState) {
|
|
||||||
Iterator<ReplicaUnderConstruction> it = replicas.iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
ReplicaUnderConstruction r = it.next();
|
|
||||||
DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
|
|
||||||
if(expectedLocation == storage) {
|
|
||||||
// Record the gen stamp from the report
|
|
||||||
r.setGenerationStamp(block.getGenerationStamp());
|
|
||||||
return;
|
|
||||||
} else if (expectedLocation != null &&
|
|
||||||
expectedLocation.getDatanodeDescriptor() ==
|
|
||||||
storage.getDatanodeDescriptor()) {
|
|
||||||
|
|
||||||
// The Datanode reported that the block is on a different storage
|
|
||||||
// than the one chosen by BlockPlacementPolicy. This can occur as
|
|
||||||
// we allow Datanodes to choose the target storage. Update our
|
|
||||||
// state by removing the stale entry and adding a new one.
|
|
||||||
it.remove();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
replicas.add(new ReplicaUnderConstruction(block, storage, rState));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
final StringBuilder b = new StringBuilder(100);
|
|
||||||
appendStringTo(b);
|
|
||||||
return b.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void appendStringTo(StringBuilder sb) {
|
|
||||||
super.appendStringTo(sb);
|
|
||||||
appendUCParts(sb);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void appendUCParts(StringBuilder sb) {
|
|
||||||
sb.append("{UCState=").append(blockUCState)
|
|
||||||
.append(", truncateBlock=" + truncateBlock)
|
|
||||||
.append(", primaryNodeIndex=").append(primaryNodeIndex)
|
|
||||||
.append(", replicas=[");
|
|
||||||
if (replicas != null) {
|
|
||||||
Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
|
|
||||||
if (iter.hasNext()) {
|
|
||||||
iter.next().appendStringTo(sb);
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
sb.append(", ");
|
|
||||||
iter.next().appendStringTo(sb);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sb.append("]}");
|
|
||||||
}
|
|
||||||
}
|
|
@ -245,27 +245,6 @@ public int numNodes() {
|
|||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert a complete block to an under construction block.
|
|
||||||
* @return BlockInfoUnderConstruction - an under construction block.
|
|
||||||
*/
|
|
||||||
public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction(
|
|
||||||
BlockUCState s, DatanodeStorageInfo[] targets) {
|
|
||||||
final BlockInfoStripedUnderConstruction ucBlock;
|
|
||||||
if(isComplete()) {
|
|
||||||
ucBlock = new BlockInfoStripedUnderConstruction(this, ecPolicy,
|
|
||||||
s, targets);
|
|
||||||
ucBlock.setBlockCollection(getBlockCollection());
|
|
||||||
} else {
|
|
||||||
// the block is already under construction
|
|
||||||
ucBlock = (BlockInfoStripedUnderConstruction) this;
|
|
||||||
ucBlock.setBlockUCState(s);
|
|
||||||
ucBlock.setExpectedLocations(targets);
|
|
||||||
ucBlock.setBlockCollection(getBlockCollection());
|
|
||||||
}
|
|
||||||
return ucBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
final boolean hasNoStorage() {
|
final boolean hasNoStorage() {
|
||||||
final int len = getCapacity();
|
final int len = getCapacity();
|
||||||
|
@ -1,84 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
||||||
|
|
||||||
public interface BlockInfoUnderConstruction {
|
|
||||||
/**
|
|
||||||
* Create array of expected replica locations
|
|
||||||
* (as has been assigned by chooseTargets()).
|
|
||||||
*/
|
|
||||||
public DatanodeStorageInfo[] getExpectedStorageLocations();
|
|
||||||
|
|
||||||
/** Get recover block */
|
|
||||||
public Block getTruncateBlock();
|
|
||||||
|
|
||||||
/** Convert to a Block object */
|
|
||||||
public Block toBlock();
|
|
||||||
|
|
||||||
/** Get block recovery ID */
|
|
||||||
public long getBlockRecoveryId();
|
|
||||||
|
|
||||||
/** Get the number of expected locations */
|
|
||||||
public int getNumExpectedLocations();
|
|
||||||
|
|
||||||
/** Set expected locations */
|
|
||||||
public void setExpectedLocations(DatanodeStorageInfo[] targets);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process the recorded replicas. When about to commit or finish the
|
|
||||||
* pipeline recovery sort out bad replicas.
|
|
||||||
* @param genStamp The final generation stamp for the block.
|
|
||||||
*/
|
|
||||||
public void setGenerationStampAndVerifyReplicas(long genStamp);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize lease recovery for this block.
|
|
||||||
* Find the first alive data-node starting from the previous primary and
|
|
||||||
* make it primary.
|
|
||||||
*/
|
|
||||||
public void initializeBlockRecovery(long recoveryId);
|
|
||||||
|
|
||||||
/** Add the reported replica if it is not already in the replica list. */
|
|
||||||
public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
|
|
||||||
Block reportedBlock, ReplicaState rState);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Commit block's length and generation stamp as reported by the client.
|
|
||||||
* Set block state to {@link BlockUCState#COMMITTED}.
|
|
||||||
* @param block - contains client reported block length and generation
|
|
||||||
* @throws IOException if block ids are inconsistent.
|
|
||||||
*/
|
|
||||||
public void commitBlock(Block block) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert an under construction block to a complete block.
|
|
||||||
*
|
|
||||||
* @return a complete block.
|
|
||||||
* @throws IOException
|
|
||||||
* if the state of the block (the generation stamp and the length)
|
|
||||||
* has not been committed by the client or it does not have at least
|
|
||||||
* a minimal number of replicas reported from data-nodes.
|
|
||||||
*/
|
|
||||||
public BlockInfo convertToCompleteBlock() throws IOException;
|
|
||||||
}
|
|
@ -644,19 +644,13 @@ public boolean hasMinStorage(BlockInfo block, int liveNum) {
|
|||||||
*/
|
*/
|
||||||
private static boolean commitBlock(final BlockInfo block,
|
private static boolean commitBlock(final BlockInfo block,
|
||||||
final Block commitBlock) throws IOException {
|
final Block commitBlock) throws IOException {
|
||||||
if (block instanceof BlockInfoUnderConstruction
|
if (block.getBlockUCState() == BlockUCState.COMMITTED)
|
||||||
&& block.getBlockUCState() != BlockUCState.COMMITTED) {
|
return false;
|
||||||
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block;
|
assert block.getNumBytes() <= commitBlock.getNumBytes() :
|
||||||
|
|
||||||
assert block.getNumBytes() <= commitBlock.getNumBytes() :
|
|
||||||
"commitBlock length is less than the stored one "
|
"commitBlock length is less than the stored one "
|
||||||
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
|
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
|
||||||
|
block.commitBlock(commitBlock);
|
||||||
uc.commitBlock(commitBlock);
|
return true;
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -713,9 +707,7 @@ private BlockInfo completeBlock(final BlockCollection bc,
|
|||||||
"Cannot complete block: block has not been COMMITTED by the client");
|
"Cannot complete block: block has not been COMMITTED by the client");
|
||||||
}
|
}
|
||||||
|
|
||||||
final BlockInfo completeBlock
|
final BlockInfo completeBlock = curBlock.convertToCompleteBlock();
|
||||||
= !(curBlock instanceof BlockInfoUnderConstruction)? curBlock
|
|
||||||
: ((BlockInfoUnderConstruction)curBlock).convertToCompleteBlock();
|
|
||||||
|
|
||||||
// replace penultimate block in file
|
// replace penultimate block in file
|
||||||
bc.setBlock(blkIndex, completeBlock);
|
bc.setBlock(blkIndex, completeBlock);
|
||||||
@ -754,9 +746,7 @@ private BlockInfo completeBlock(final BlockCollection bc,
|
|||||||
*/
|
*/
|
||||||
public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
||||||
final BlockInfo block) throws IOException {
|
final BlockInfo block) throws IOException {
|
||||||
if (block instanceof BlockInfoUnderConstruction) {
|
block.commitBlock(block);
|
||||||
((BlockInfoUnderConstruction)block).commitBlock(block);
|
|
||||||
}
|
|
||||||
return completeBlock(bc, block, true);
|
return completeBlock(bc, block, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -777,30 +767,28 @@ public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
|||||||
*/
|
*/
|
||||||
public LocatedBlock convertLastBlockToUnderConstruction(
|
public LocatedBlock convertLastBlockToUnderConstruction(
|
||||||
BlockCollection bc, long bytesToRemove) throws IOException {
|
BlockCollection bc, long bytesToRemove) throws IOException {
|
||||||
BlockInfo oldBlock = bc.getLastBlock();
|
BlockInfo lastBlock = bc.getLastBlock();
|
||||||
if(oldBlock == null ||
|
if(lastBlock == null ||
|
||||||
bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove)
|
bc.getPreferredBlockSize() == lastBlock.getNumBytes() - bytesToRemove)
|
||||||
return null;
|
return null;
|
||||||
assert oldBlock == getStoredBlock(oldBlock) :
|
assert lastBlock == getStoredBlock(lastBlock) :
|
||||||
"last block of the file is not in blocksMap";
|
"last block of the file is not in blocksMap";
|
||||||
|
|
||||||
DatanodeStorageInfo[] targets = getStorages(oldBlock);
|
DatanodeStorageInfo[] targets = getStorages(lastBlock);
|
||||||
|
|
||||||
// convert the last block to UC
|
// convert the last block to under construction. note no block replacement
|
||||||
bc.convertLastBlockToUC(oldBlock, targets);
|
// is happening
|
||||||
// get the new created uc block
|
bc.convertLastBlockToUC(lastBlock, targets);
|
||||||
BlockInfo ucBlock = bc.getLastBlock();
|
|
||||||
blocksMap.replaceBlock(ucBlock);
|
|
||||||
|
|
||||||
// Remove block from replication queue.
|
// Remove block from replication queue.
|
||||||
NumberReplicas replicas = countNodes(ucBlock);
|
NumberReplicas replicas = countNodes(lastBlock);
|
||||||
neededReplications.remove(ucBlock, replicas.liveReplicas(),
|
neededReplications.remove(lastBlock, replicas.liveReplicas(),
|
||||||
replicas.decommissionedAndDecommissioning(), getReplication(ucBlock));
|
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
|
||||||
pendingReplications.remove(ucBlock);
|
pendingReplications.remove(lastBlock);
|
||||||
|
|
||||||
// remove this block from the list of pending blocks to be deleted.
|
// remove this block from the list of pending blocks to be deleted.
|
||||||
for (DatanodeStorageInfo storage : targets) {
|
for (DatanodeStorageInfo storage : targets) {
|
||||||
final Block b = getBlockOnStorage(oldBlock, storage);
|
final Block b = getBlockOnStorage(lastBlock, storage);
|
||||||
if (b != null) {
|
if (b != null) {
|
||||||
invalidateBlocks.remove(storage.getDatanodeDescriptor(), b);
|
invalidateBlocks.remove(storage.getDatanodeDescriptor(), b);
|
||||||
}
|
}
|
||||||
@ -810,13 +798,15 @@ public LocatedBlock convertLastBlockToUnderConstruction(
|
|||||||
// count in safe-mode.
|
// count in safe-mode.
|
||||||
namesystem.adjustSafeModeBlockTotals(
|
namesystem.adjustSafeModeBlockTotals(
|
||||||
// decrement safe if we had enough
|
// decrement safe if we had enough
|
||||||
hasMinStorage(oldBlock, targets.length) ? -1 : 0,
|
hasMinStorage(lastBlock, targets.length) ? -1 : 0,
|
||||||
// always decrement total blocks
|
// always decrement total blocks
|
||||||
-1);
|
-1);
|
||||||
|
|
||||||
final long fileLength = bc.computeContentSummary(getStoragePolicySuite()).getLength();
|
final long fileLength = bc.computeContentSummary(
|
||||||
final long pos = fileLength - ucBlock.getNumBytes();
|
getStoragePolicySuite()).getLength();
|
||||||
return createLocatedBlock(ucBlock, pos, BlockTokenIdentifier.AccessMode.WRITE);
|
final long pos = fileLength - lastBlock.getNumBytes();
|
||||||
|
return createLocatedBlock(lastBlock, pos,
|
||||||
|
BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -895,18 +885,14 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
|
|||||||
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
|
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (!blk.isComplete()) {
|
if (!blk.isComplete()) {
|
||||||
|
final BlockUnderConstructionFeature uc = blk.getUnderConstructionFeature();
|
||||||
if (blk.isStriped()) {
|
if (blk.isStriped()) {
|
||||||
final BlockInfoStripedUnderConstruction uc =
|
|
||||||
(BlockInfoStripedUnderConstruction) blk;
|
|
||||||
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
||||||
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
|
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
|
||||||
blk);
|
blk);
|
||||||
return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
|
return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
|
||||||
false);
|
false);
|
||||||
} else {
|
} else {
|
||||||
assert blk instanceof BlockInfoContiguousUnderConstruction;
|
|
||||||
final BlockInfoContiguousUnderConstruction uc =
|
|
||||||
(BlockInfoContiguousUnderConstruction) blk;
|
|
||||||
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
||||||
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
|
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
|
||||||
blk);
|
blk);
|
||||||
@ -1923,9 +1909,7 @@ static class StatefulBlockInfo {
|
|||||||
|
|
||||||
StatefulBlockInfo(BlockInfo storedBlock,
|
StatefulBlockInfo(BlockInfo storedBlock,
|
||||||
Block reportedBlock, ReplicaState reportedState) {
|
Block reportedBlock, ReplicaState reportedState) {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(!storedBlock.isComplete());
|
||||||
storedBlock instanceof BlockInfoContiguousUnderConstruction ||
|
|
||||||
storedBlock instanceof BlockInfoStripedUnderConstruction);
|
|
||||||
this.storedBlock = storedBlock;
|
this.storedBlock = storedBlock;
|
||||||
this.reportedBlock = reportedBlock;
|
this.reportedBlock = reportedBlock;
|
||||||
this.reportedState = reportedState;
|
this.reportedState = reportedState;
|
||||||
@ -2335,13 +2319,14 @@ private void processFirstBlockReport(
|
|||||||
|
|
||||||
// If block is under construction, add this replica to its list
|
// If block is under construction, add this replica to its list
|
||||||
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
||||||
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)storedBlock;
|
storedBlock.getUnderConstructionFeature()
|
||||||
uc.addReplicaIfNotPresent(storageInfo, iblk, reportedState);
|
.addReplicaIfNotPresent(storageInfo, iblk, reportedState);
|
||||||
// OpenFileBlocks only inside snapshots also will be added to safemode
|
// OpenFileBlocks only inside snapshots also will be added to safemode
|
||||||
// threshold. So we need to update such blocks to safemode
|
// threshold. So we need to update such blocks to safemode
|
||||||
// refer HDFS-5283
|
// refer HDFS-5283
|
||||||
if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) {
|
if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) {
|
||||||
int numOfReplicas = uc.getNumExpectedLocations();
|
int numOfReplicas = storedBlock.getUnderConstructionFeature()
|
||||||
|
.getNumExpectedLocations();
|
||||||
namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
|
namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
|
||||||
}
|
}
|
||||||
//and fall through to next clause
|
//and fall through to next clause
|
||||||
@ -2469,11 +2454,6 @@ private BlockInfo processReportedBlock(
|
|||||||
|
|
||||||
// Ignore replicas already scheduled to be removed from the DN
|
// Ignore replicas already scheduled to be removed from the DN
|
||||||
if(invalidateBlocks.contains(dn, block)) {
|
if(invalidateBlocks.contains(dn, block)) {
|
||||||
/*
|
|
||||||
* TODO: following assertion is incorrect, see HDFS-2668 assert
|
|
||||||
* storedBlock.findDatanode(dn) < 0 : "Block " + block +
|
|
||||||
* " in recentInvalidatesSet should not appear in DN " + dn;
|
|
||||||
*/
|
|
||||||
return storedBlock;
|
return storedBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2704,9 +2684,8 @@ private boolean isBlockUnderConstruction(BlockInfo storedBlock,
|
|||||||
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
|
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
|
||||||
DatanodeStorageInfo storageInfo) throws IOException {
|
DatanodeStorageInfo storageInfo) throws IOException {
|
||||||
BlockInfo block = ucBlock.storedBlock;
|
BlockInfo block = ucBlock.storedBlock;
|
||||||
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block;
|
block.getUnderConstructionFeature().addReplicaIfNotPresent(storageInfo,
|
||||||
uc.addReplicaIfNotPresent(storageInfo, ucBlock.reportedBlock,
|
ucBlock.reportedBlock, ucBlock.reportedState);
|
||||||
ucBlock.reportedState);
|
|
||||||
|
|
||||||
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
|
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
|
||||||
(block.findStorageInfo(storageInfo) < 0)) {
|
(block.findStorageInfo(storageInfo) < 0)) {
|
||||||
@ -2766,8 +2745,7 @@ private Block addStoredBlock(final BlockInfo block,
|
|||||||
assert block != null && namesystem.hasWriteLock();
|
assert block != null && namesystem.hasWriteLock();
|
||||||
BlockInfo storedBlock;
|
BlockInfo storedBlock;
|
||||||
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
||||||
if (block instanceof BlockInfoContiguousUnderConstruction ||
|
if (!block.isComplete()) {
|
||||||
block instanceof BlockInfoStripedUnderConstruction) {
|
|
||||||
//refresh our copy in case the block got completed in another thread
|
//refresh our copy in case the block got completed in another thread
|
||||||
storedBlock = getStoredBlock(block);
|
storedBlock = getStoredBlock(block);
|
||||||
} else {
|
} else {
|
||||||
@ -4221,7 +4199,7 @@ public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info,
|
|||||||
final LocatedBlock lb;
|
final LocatedBlock lb;
|
||||||
if (info.isStriped()) {
|
if (info.isStriped()) {
|
||||||
lb = newLocatedStripedBlock(eb, locs,
|
lb = newLocatedStripedBlock(eb, locs,
|
||||||
((BlockInfoStripedUnderConstruction)info).getBlockIndices(),
|
info.getUnderConstructionFeature().getBlockIndices(),
|
||||||
offset, false);
|
offset, false);
|
||||||
} else {
|
} else {
|
||||||
lb = newLocatedBlock(eb, locs, offset, false);
|
lb = newLocatedBlock(eb, locs, offset, false);
|
||||||
|
@ -21,19 +21,14 @@
|
|||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents a striped block that is currently being constructed.
|
* Represents the under construction feature of a Block.
|
||||||
* This is usually the last block of a file opened for write or append.
|
* This is usually the last block of a file opened for write or append.
|
||||||
*/
|
*/
|
||||||
public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
|
public class BlockUnderConstructionFeature {
|
||||||
implements BlockInfoUnderConstruction{
|
|
||||||
private BlockUCState blockUCState;
|
private BlockUCState blockUCState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -55,41 +50,30 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
|
|||||||
private long blockRecoveryId = 0;
|
private long blockRecoveryId = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor with null storage targets.
|
* The block source to use in the event of copy-on-write truncate.
|
||||||
*/
|
*/
|
||||||
public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy) {
|
private Block truncateBlock;
|
||||||
this(blk, ecPolicy, UNDER_CONSTRUCTION, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
public BlockUnderConstructionFeature(Block blk,
|
||||||
* Create a striped block that is currently being constructed.
|
BlockUCState state, DatanodeStorageInfo[] targets, boolean isStriped) {
|
||||||
*/
|
|
||||||
public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy,
|
|
||||||
BlockUCState state, DatanodeStorageInfo[] targets) {
|
|
||||||
super(blk, ecPolicy);
|
|
||||||
assert getBlockUCState() != COMPLETE :
|
assert getBlockUCState() != COMPLETE :
|
||||||
"BlockInfoStripedUnderConstruction cannot be in COMPLETE state";
|
"BlockUnderConstructionFeature cannot be in COMPLETE state";
|
||||||
this.blockUCState = state;
|
this.blockUCState = state;
|
||||||
setExpectedLocations(targets);
|
setExpectedLocations(blk, targets, isStriped);
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BlockInfoStriped convertToCompleteBlock() throws IOException {
|
|
||||||
assert getBlockUCState() != COMPLETE :
|
|
||||||
"Trying to convert a COMPLETE block";
|
|
||||||
return new BlockInfoStriped(this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Set expected locations */
|
/** Set expected locations */
|
||||||
@Override
|
public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets,
|
||||||
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
|
boolean isStriped) {
|
||||||
int numLocations = targets == null ? 0 : targets.length;
|
int numLocations = targets == null ? 0 : targets.length;
|
||||||
this.replicas = new ReplicaUnderConstruction[numLocations];
|
this.replicas = new ReplicaUnderConstruction[numLocations];
|
||||||
for(int i = 0; i < numLocations; i++) {
|
for(int i = 0; i < numLocations; i++) {
|
||||||
// when creating a new block we simply sequentially assign block index to
|
// when creating a new striped block we simply sequentially assign block
|
||||||
// each storage
|
// index to each storage
|
||||||
Block blk = new Block(this.getBlockId() + i, 0, this.getGenerationStamp());
|
Block replicaBlock = isStriped ?
|
||||||
replicas[i] = new ReplicaUnderConstruction(blk, targets[i],
|
new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) :
|
||||||
|
block;
|
||||||
|
replicas[i] = new ReplicaUnderConstruction(replicaBlock, targets[i],
|
||||||
ReplicaState.RBW);
|
ReplicaState.RBW);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -98,7 +82,6 @@ public void setExpectedLocations(DatanodeStorageInfo[] targets) {
|
|||||||
* Create array of expected replica locations
|
* Create array of expected replica locations
|
||||||
* (as has been assigned by chooseTargets()).
|
* (as has been assigned by chooseTargets()).
|
||||||
*/
|
*/
|
||||||
@Override
|
|
||||||
public DatanodeStorageInfo[] getExpectedStorageLocations() {
|
public DatanodeStorageInfo[] getExpectedStorageLocations() {
|
||||||
int numLocations = getNumExpectedLocations();
|
int numLocations = getNumExpectedLocations();
|
||||||
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
|
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
|
||||||
@ -108,7 +91,10 @@ public DatanodeStorageInfo[] getExpectedStorageLocations() {
|
|||||||
return storages;
|
return storages;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return the index array indicating the block index in each storage */
|
/**
|
||||||
|
* @return the index array indicating the block index in each storage. Used
|
||||||
|
* only by striped blocks.
|
||||||
|
*/
|
||||||
public int[] getBlockIndices() {
|
public int[] getBlockIndices() {
|
||||||
int numLocations = getNumExpectedLocations();
|
int numLocations = getNumExpectedLocations();
|
||||||
int[] indices = new int[numLocations];
|
int[] indices = new int[numLocations];
|
||||||
@ -118,7 +104,6 @@ public int[] getBlockIndices() {
|
|||||||
return indices;
|
return indices;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getNumExpectedLocations() {
|
public int getNumExpectedLocations() {
|
||||||
return replicas == null ? 0 : replicas.length;
|
return replicas == null ? 0 : replicas.length;
|
||||||
}
|
}
|
||||||
@ -127,7 +112,6 @@ public int getNumExpectedLocations() {
|
|||||||
* Return the state of the block under construction.
|
* Return the state of the block under construction.
|
||||||
* @see BlockUCState
|
* @see BlockUCState
|
||||||
*/
|
*/
|
||||||
@Override // BlockInfo
|
|
||||||
public BlockUCState getBlockUCState() {
|
public BlockUCState getBlockUCState() {
|
||||||
return blockUCState;
|
return blockUCState;
|
||||||
}
|
}
|
||||||
@ -136,58 +120,51 @@ void setBlockUCState(BlockUCState s) {
|
|||||||
blockUCState = s;
|
blockUCState = s;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getBlockRecoveryId() {
|
public long getBlockRecoveryId() {
|
||||||
return blockRecoveryId;
|
return blockRecoveryId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/** Get recover block */
|
||||||
public Block getTruncateBlock() {
|
public Block getTruncateBlock() {
|
||||||
return null;
|
return truncateBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public void setTruncateBlock(Block recoveryBlock) {
|
||||||
public Block toBlock(){
|
this.truncateBlock = recoveryBlock;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public void setGenerationStampAndVerifyReplicas(long genStamp) {
|
* Set {@link #blockUCState} to {@link BlockUCState#COMMITTED}.
|
||||||
// Set the generation stamp for the block.
|
*/
|
||||||
setGenerationStamp(genStamp);
|
void commit() {
|
||||||
if (replicas == null)
|
blockUCState = BlockUCState.COMMITTED;
|
||||||
return;
|
}
|
||||||
|
|
||||||
// Remove the replicas with wrong gen stamp.
|
void removeStaleReplicas(BlockInfo block) {
|
||||||
// The replica list is unchanged.
|
final long genStamp = block.getGenerationStamp();
|
||||||
for (ReplicaUnderConstruction r : replicas) {
|
if (replicas != null) {
|
||||||
if (genStamp != r.getGenerationStamp()) {
|
// Remove replicas with wrong gen stamp. The replica list is unchanged.
|
||||||
r.getExpectedStorageLocation().removeBlock(this);
|
for (ReplicaUnderConstruction r : replicas) {
|
||||||
NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
|
if (genStamp != r.getGenerationStamp()) {
|
||||||
+ "from location: {}", r.getExpectedStorageLocation());
|
r.getExpectedStorageLocation().removeBlock(block);
|
||||||
|
NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica "
|
||||||
|
+ "from location: {}", r.getExpectedStorageLocation());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public void commitBlock(Block block) throws IOException {
|
* Initialize lease recovery for this block.
|
||||||
if (getBlockId() != block.getBlockId()) {
|
* Find the first alive data-node starting from the previous primary and
|
||||||
throw new IOException("Trying to commit inconsistent block: id = "
|
* make it primary.
|
||||||
+ block.getBlockId() + ", expected id = " + getBlockId());
|
*/
|
||||||
}
|
public void initializeBlockRecovery(BlockInfo blockInfo, long recoveryId) {
|
||||||
blockUCState = BlockUCState.COMMITTED;
|
|
||||||
this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
|
|
||||||
// Sort out invalid replicas.
|
|
||||||
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initializeBlockRecovery(long recoveryId) {
|
|
||||||
setBlockUCState(BlockUCState.UNDER_RECOVERY);
|
setBlockUCState(BlockUCState.UNDER_RECOVERY);
|
||||||
blockRecoveryId = recoveryId;
|
blockRecoveryId = recoveryId;
|
||||||
if (replicas == null || replicas.length == 0) {
|
if (replicas == null || replicas.length == 0) {
|
||||||
NameNode.blockStateChangeLog.warn("BLOCK*" +
|
NameNode.blockStateChangeLog.warn("BLOCK*" +
|
||||||
" BlockInfoStripedUnderConstruction.initLeaseRecovery:" +
|
" BlockUnderConstructionFeature.initLeaseRecovery:" +
|
||||||
" No blocks found, lease removed.");
|
" No blocks found, lease removed.");
|
||||||
// sets primary node index and return.
|
// sets primary node index and return.
|
||||||
primaryNodeIndex = -1;
|
primaryNodeIndex = -1;
|
||||||
@ -226,15 +203,15 @@ public void initializeBlockRecovery(long recoveryId) {
|
|||||||
}
|
}
|
||||||
if (primary != null) {
|
if (primary != null) {
|
||||||
primary.getExpectedStorageLocation().getDatanodeDescriptor()
|
primary.getExpectedStorageLocation().getDatanodeDescriptor()
|
||||||
.addBlockToBeRecovered(this);
|
.addBlockToBeRecovered(blockInfo);
|
||||||
primary.setChosenAsPrimary(true);
|
primary.setChosenAsPrimary(true);
|
||||||
NameNode.blockStateChangeLog.info(
|
NameNode.blockStateChangeLog.info(
|
||||||
"BLOCK* {} recovery started, primary={}", this, primary);
|
"BLOCK* {} recovery started, primary={}", this, primary);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/** Add the reported replica if it is not already in the replica list. */
|
||||||
public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
|
void addReplicaIfNotPresent(DatanodeStorageInfo storage,
|
||||||
Block reportedBlock, ReplicaState rState) {
|
Block reportedBlock, ReplicaState rState) {
|
||||||
if (replicas == null) {
|
if (replicas == null) {
|
||||||
replicas = new ReplicaUnderConstruction[1];
|
replicas = new ReplicaUnderConstruction[1];
|
||||||
@ -269,20 +246,15 @@ public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
|
|||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
final StringBuilder b = new StringBuilder(100);
|
final StringBuilder b = new StringBuilder(100);
|
||||||
appendStringTo(b);
|
appendUCParts(b);
|
||||||
return b.toString();
|
return b.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void appendStringTo(StringBuilder sb) {
|
|
||||||
super.appendStringTo(sb);
|
|
||||||
appendUCParts(sb);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void appendUCParts(StringBuilder sb) {
|
private void appendUCParts(StringBuilder sb) {
|
||||||
sb.append("{UCState=").append(blockUCState).
|
sb.append("{UCState=").append(blockUCState)
|
||||||
append(", primaryNodeIndex=").append(primaryNodeIndex).
|
.append(", truncateBlock=").append(truncateBlock)
|
||||||
append(", replicas=[");
|
.append(", primaryNodeIndex=").append(primaryNodeIndex)
|
||||||
|
.append(", replicas=[");
|
||||||
if (replicas != null) {
|
if (replicas != null) {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (ReplicaUnderConstruction r : replicas) {
|
for (ReplicaUnderConstruction r : replicas) {
|
@ -227,7 +227,7 @@ public CachedBlocksList getPendingUncached() {
|
|||||||
private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
|
private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
|
||||||
new BlockQueue<>();
|
new BlockQueue<>();
|
||||||
/** A queue of blocks to be recovered by this datanode */
|
/** A queue of blocks to be recovered by this datanode */
|
||||||
private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
|
private final BlockQueue<BlockInfo> recoverBlocks =
|
||||||
new BlockQueue<>();
|
new BlockQueue<>();
|
||||||
/** A set of blocks to be invalidated by this datanode */
|
/** A set of blocks to be invalidated by this datanode */
|
||||||
private final LightWeightHashSet<Block> invalidateBlocks =
|
private final LightWeightHashSet<Block> invalidateBlocks =
|
||||||
@ -624,7 +624,7 @@ void addBlockToBeErasureCoded(ExtendedBlock block,
|
|||||||
/**
|
/**
|
||||||
* Store block recovery work.
|
* Store block recovery work.
|
||||||
*/
|
*/
|
||||||
void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
|
void addBlockToBeRecovered(BlockInfo block) {
|
||||||
if(recoverBlocks.contains(block)) {
|
if(recoverBlocks.contains(block)) {
|
||||||
// this prevents adding the same block twice to the recovery queue
|
// this prevents adding the same block twice to the recovery queue
|
||||||
BlockManager.LOG.info(block + " is already in the recovery queue");
|
BlockManager.LOG.info(block + " is already in the recovery queue");
|
||||||
@ -678,11 +678,11 @@ public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) {
|
|||||||
return erasurecodeBlocks.poll(maxTransfers);
|
return erasurecodeBlocks.poll(maxTransfers);
|
||||||
}
|
}
|
||||||
|
|
||||||
public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
|
public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) {
|
||||||
List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
|
List<BlockInfo> blocks = recoverBlocks.poll(maxTransfers);
|
||||||
if(blocks == null)
|
if(blocks == null)
|
||||||
return null;
|
return null;
|
||||||
return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]);
|
return blocks.toArray(new BlockInfo[blocks.size()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1381,13 +1381,15 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
}
|
}
|
||||||
|
|
||||||
//check lease recovery
|
//check lease recovery
|
||||||
BlockInfoUnderConstruction[] blocks = nodeinfo
|
BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
|
||||||
.getLeaseRecoveryCommand(Integer.MAX_VALUE);
|
|
||||||
if (blocks != null) {
|
if (blocks != null) {
|
||||||
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
|
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
|
||||||
blocks.length);
|
blocks.length);
|
||||||
for (BlockInfoUnderConstruction b : blocks) {
|
for (BlockInfo b : blocks) {
|
||||||
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
|
final BlockUnderConstructionFeature uc =
|
||||||
|
b.getUnderConstructionFeature();
|
||||||
|
assert uc != null;
|
||||||
|
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
||||||
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
|
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
|
||||||
final List<DatanodeStorageInfo> recoveryLocations =
|
final List<DatanodeStorageInfo> recoveryLocations =
|
||||||
new ArrayList<>(storages.length);
|
new ArrayList<>(storages.length);
|
||||||
@ -1398,12 +1400,12 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
}
|
}
|
||||||
// If we are performing a truncate recovery than set recovery fields
|
// If we are performing a truncate recovery than set recovery fields
|
||||||
// to old block.
|
// to old block.
|
||||||
boolean truncateRecovery = b.getTruncateBlock() != null;
|
boolean truncateRecovery = uc.getTruncateBlock() != null;
|
||||||
boolean copyOnTruncateRecovery = truncateRecovery &&
|
boolean copyOnTruncateRecovery = truncateRecovery &&
|
||||||
b.getTruncateBlock().getBlockId() != b.toBlock().getBlockId();
|
uc.getTruncateBlock().getBlockId() != b.getBlockId();
|
||||||
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
|
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
|
||||||
new ExtendedBlock(blockPoolId, b.getTruncateBlock()) :
|
new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
|
||||||
new ExtendedBlock(blockPoolId, b.toBlock());
|
new ExtendedBlock(blockPoolId, b);
|
||||||
// If we only get 1 replica after eliminating stale nodes, then choose all
|
// If we only get 1 replica after eliminating stale nodes, then choose all
|
||||||
// replicas for recovery and let the primary data node handle failures.
|
// replicas for recovery and let the primary data node handle failures.
|
||||||
DatanodeInfo[] recoveryInfos;
|
DatanodeInfo[] recoveryInfos;
|
||||||
@ -1420,13 +1422,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
|
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
|
||||||
}
|
}
|
||||||
if(truncateRecovery) {
|
if(truncateRecovery) {
|
||||||
Block recoveryBlock = (copyOnTruncateRecovery) ? b.toBlock() :
|
Block recoveryBlock = (copyOnTruncateRecovery) ? b :
|
||||||
b.getTruncateBlock();
|
uc.getTruncateBlock();
|
||||||
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
|
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
|
||||||
recoveryBlock));
|
recoveryBlock));
|
||||||
} else {
|
} else {
|
||||||
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
|
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
|
||||||
b.getBlockRecoveryId()));
|
uc.getBlockRecoveryId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new DatanodeCommand[] { brCommand };
|
return new DatanodeCommand[] { brCommand };
|
||||||
|
@ -28,8 +28,9 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||||
@ -102,7 +103,7 @@ static TruncateResult truncate(final FSNamesystem fsn, final String srcArg,
|
|||||||
final BlockInfo last = file.getLastBlock();
|
final BlockInfo last = file.getLastBlock();
|
||||||
if (last != null && last.getBlockUCState()
|
if (last != null && last.getBlockUCState()
|
||||||
== BlockUCState.UNDER_RECOVERY) {
|
== BlockUCState.UNDER_RECOVERY) {
|
||||||
final Block truncatedBlock = ((BlockInfoContiguousUnderConstruction) last)
|
final Block truncatedBlock = last.getUnderConstructionFeature()
|
||||||
.getTruncateBlock();
|
.getTruncateBlock();
|
||||||
if (truncatedBlock != null) {
|
if (truncatedBlock != null) {
|
||||||
final long truncateLength = file.computeFileSize(false, false)
|
final long truncateLength = file.computeFileSize(false, false)
|
||||||
@ -231,43 +232,42 @@ static Block prepareFileForTruncate(FSNamesystem fsn, INodesInPath iip,
|
|||||||
oldBlock)));
|
oldBlock)));
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockInfoContiguousUnderConstruction truncatedBlockUC;
|
final BlockInfo truncatedBlockUC;
|
||||||
BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
|
BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
|
||||||
if (shouldCopyOnTruncate) {
|
if (shouldCopyOnTruncate) {
|
||||||
// Add new truncateBlock into blocksMap and
|
// Add new truncateBlock into blocksMap and
|
||||||
// use oldBlock as a source for copy-on-truncate recovery
|
// use oldBlock as a source for copy-on-truncate recovery
|
||||||
truncatedBlockUC = new BlockInfoContiguousUnderConstruction(newBlock,
|
truncatedBlockUC = new BlockInfoContiguous(newBlock,
|
||||||
file.getPreferredBlockReplication());
|
file.getPreferredBlockReplication());
|
||||||
|
truncatedBlockUC.convertToBlockUnderConstruction(
|
||||||
|
BlockUCState.UNDER_CONSTRUCTION, blockManager.getStorages(oldBlock));
|
||||||
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
|
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
|
||||||
truncatedBlockUC.setTruncateBlock(oldBlock);
|
truncatedBlockUC.getUnderConstructionFeature().setTruncateBlock(oldBlock);
|
||||||
file.convertLastBlockToUC(truncatedBlockUC,
|
file.setLastBlock(truncatedBlockUC);
|
||||||
blockManager.getStorages(oldBlock));
|
|
||||||
blockManager.addBlockCollection(truncatedBlockUC, file);
|
blockManager.addBlockCollection(truncatedBlockUC, file);
|
||||||
|
|
||||||
NameNode.stateChangeLog.debug(
|
NameNode.stateChangeLog.debug(
|
||||||
"BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
|
"BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
|
||||||
+ " size {} new block {} old block {}",
|
+ " size {} new block {} old block {}",
|
||||||
truncatedBlockUC.getNumBytes(), newBlock,
|
truncatedBlockUC.getNumBytes(), newBlock, oldBlock);
|
||||||
truncatedBlockUC.getTruncateBlock());
|
|
||||||
} else {
|
} else {
|
||||||
// Use new generation stamp for in-place truncate recovery
|
// Use new generation stamp for in-place truncate recovery
|
||||||
blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
|
blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
|
||||||
oldBlock = file.getLastBlock();
|
oldBlock = file.getLastBlock();
|
||||||
assert !oldBlock.isComplete() : "oldBlock should be under construction";
|
assert !oldBlock.isComplete() : "oldBlock should be under construction";
|
||||||
truncatedBlockUC = (BlockInfoContiguousUnderConstruction) oldBlock;
|
BlockUnderConstructionFeature uc = oldBlock.getUnderConstructionFeature();
|
||||||
truncatedBlockUC.setTruncateBlock(new Block(oldBlock));
|
uc.setTruncateBlock(new Block(oldBlock));
|
||||||
truncatedBlockUC.getTruncateBlock().setNumBytes(
|
uc.getTruncateBlock().setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
|
||||||
oldBlock.getNumBytes() - lastBlockDelta);
|
uc.getTruncateBlock().setGenerationStamp(newBlock.getGenerationStamp());
|
||||||
truncatedBlockUC.getTruncateBlock().setGenerationStamp(
|
truncatedBlockUC = oldBlock;
|
||||||
newBlock.getGenerationStamp());
|
|
||||||
|
|
||||||
NameNode.stateChangeLog.debug(
|
NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: " +
|
||||||
"BLOCK* prepareFileForTruncate: {} Scheduling in-place block "
|
"{} Scheduling in-place block truncate to new size {}",
|
||||||
+ "truncate to new size {}", truncatedBlockUC.getTruncateBlock()
|
uc, uc.getTruncateBlock().getNumBytes());
|
||||||
.getNumBytes(), truncatedBlockUC);
|
|
||||||
}
|
}
|
||||||
if (shouldRecoverNow) {
|
if (shouldRecoverNow) {
|
||||||
truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
|
truncatedBlockUC.getUnderConstructionFeature().initializeBlockRecovery(
|
||||||
|
truncatedBlockUC, newBlock.getGenerationStamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
return newBlock;
|
return newBlock;
|
||||||
|
@ -45,10 +45,10 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
@ -77,7 +77,7 @@ static boolean unprotectedRemoveBlock(
|
|||||||
Block block) throws IOException {
|
Block block) throws IOException {
|
||||||
// modify file-> block and blocksMap
|
// modify file-> block and blocksMap
|
||||||
// fileNode should be under construction
|
// fileNode should be under construction
|
||||||
BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block);
|
BlockInfo uc = fileNode.removeLastBlock(block);
|
||||||
if (uc == null) {
|
if (uc == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -214,8 +214,8 @@ static ValidateAddBlockResult validateAddBlock(
|
|||||||
|
|
||||||
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
|
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
|
||||||
DatanodeStorageInfo[] locs, long offset) throws IOException {
|
DatanodeStorageInfo[] locs, long offset) throws IOException {
|
||||||
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
|
LocatedBlock lBlk = BlockManager.newLocatedBlock(
|
||||||
blk, locs, offset);
|
fsn.getExtendedBlock(new Block(blk)), blk, locs, offset);
|
||||||
fsn.getBlockManager().setBlockToken(lBlk,
|
fsn.getBlockManager().setBlockToken(lBlk,
|
||||||
BlockTokenIdentifier.AccessMode.WRITE);
|
BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
return lBlk;
|
return lBlk;
|
||||||
@ -247,8 +247,8 @@ static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
|
|||||||
} else {
|
} else {
|
||||||
// add new chosen targets to already allocated block and return
|
// add new chosen targets to already allocated block and return
|
||||||
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
||||||
((BlockInfoContiguousUnderConstruction) lastBlockInFile)
|
lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(
|
||||||
.setExpectedLocations(targets);
|
lastBlockInFile, targets, pendingFile.isStriped());
|
||||||
offset = pendingFile.computeFileSize();
|
offset = pendingFile.computeFileSize();
|
||||||
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
|
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
|
||||||
}
|
}
|
||||||
@ -542,7 +542,8 @@ private static BlockInfo addBlock(FSDirectory fsd, String path,
|
|||||||
// check quota limits and updated space consumed
|
// check quota limits and updated space consumed
|
||||||
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
|
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
|
||||||
numLocations, true);
|
numLocations, true);
|
||||||
blockInfo = new BlockInfoStripedUnderConstruction(block, ecPolicy,
|
blockInfo = new BlockInfoStriped(block, ecPolicy);
|
||||||
|
blockInfo.convertToBlockUnderConstruction(
|
||||||
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
||||||
} else {
|
} else {
|
||||||
// check quota limits and updated space consumed
|
// check quota limits and updated space consumed
|
||||||
@ -550,9 +551,9 @@ private static BlockInfo addBlock(FSDirectory fsd, String path,
|
|||||||
fileINode.getPreferredBlockReplication(), true);
|
fileINode.getPreferredBlockReplication(), true);
|
||||||
|
|
||||||
short numLocations = fileINode.getFileReplication();
|
short numLocations = fileINode.getFileReplication();
|
||||||
blockInfo = new BlockInfoContiguousUnderConstruction(block,
|
blockInfo = new BlockInfoContiguous(block, numLocations);
|
||||||
numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
|
blockInfo.convertToBlockUnderConstruction(
|
||||||
targets);
|
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
||||||
}
|
}
|
||||||
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
|
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
|
||||||
fileINode.addBlock(blockInfo);
|
fileINode.addBlock(blockInfo);
|
||||||
@ -692,10 +693,10 @@ private static FileState analyzeFileState(
|
|||||||
"allocation of a new block in " + src + ". Returning previously" +
|
"allocation of a new block in " + src + ". Returning previously" +
|
||||||
" allocated block " + lastBlockInFile);
|
" allocated block " + lastBlockInFile);
|
||||||
long offset = file.computeFileSize();
|
long offset = file.computeFileSize();
|
||||||
BlockInfoUnderConstruction lastBlockUC =
|
BlockUnderConstructionFeature uc =
|
||||||
(BlockInfoUnderConstruction) lastBlockInFile;
|
lastBlockInFile.getUnderConstructionFeature();
|
||||||
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
|
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
|
||||||
lastBlockUC.getExpectedStorageLocations(), offset);
|
uc.getExpectedStorageLocations(), offset);
|
||||||
return new FileState(file, src, iip);
|
return new FileState(file, src, iip);
|
||||||
} else {
|
} else {
|
||||||
// Case 3
|
// Case 3
|
||||||
|
@ -42,15 +42,14 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
@ -991,12 +990,14 @@ private void addNewBlock(AddBlockOp op, INodeFile file,
|
|||||||
final BlockInfo newBlockInfo;
|
final BlockInfo newBlockInfo;
|
||||||
boolean isStriped = ecZone != null;
|
boolean isStriped = ecZone != null;
|
||||||
if (isStriped) {
|
if (isStriped) {
|
||||||
newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock,
|
newBlockInfo = new BlockInfoStriped(newBlock,
|
||||||
ecZone.getErasureCodingPolicy());
|
ecZone.getErasureCodingPolicy());
|
||||||
} else {
|
} else {
|
||||||
newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock,
|
newBlockInfo = new BlockInfoContiguous(newBlock,
|
||||||
file.getPreferredBlockReplication());
|
file.getPreferredBlockReplication());
|
||||||
}
|
}
|
||||||
|
newBlockInfo.convertToBlockUnderConstruction(
|
||||||
|
BlockUCState.UNDER_CONSTRUCTION, null);
|
||||||
fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBlockInfo, file);
|
fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBlockInfo, file);
|
||||||
file.addBlock(newBlockInfo);
|
file.addBlock(newBlockInfo);
|
||||||
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
|
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
|
||||||
@ -1077,12 +1078,14 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
|||||||
// what about an old-version fsync() where fsync isn't called
|
// what about an old-version fsync() where fsync isn't called
|
||||||
// until several blocks in?
|
// until several blocks in?
|
||||||
if (isStriped) {
|
if (isStriped) {
|
||||||
newBI = new BlockInfoStripedUnderConstruction(newBlock,
|
newBI = new BlockInfoStriped(newBlock,
|
||||||
ecZone.getErasureCodingPolicy());
|
ecZone.getErasureCodingPolicy());
|
||||||
} else {
|
} else {
|
||||||
newBI = new BlockInfoContiguousUnderConstruction(newBlock,
|
newBI = new BlockInfoContiguous(newBlock,
|
||||||
file.getPreferredBlockReplication());
|
file.getPreferredBlockReplication());
|
||||||
}
|
}
|
||||||
|
newBI.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||||
|
null);
|
||||||
} else {
|
} else {
|
||||||
// OP_CLOSE should add finalized blocks. This code path
|
// OP_CLOSE should add finalized blocks. This code path
|
||||||
// is only executed when loading edits written by prior
|
// is only executed when loading edits written by prior
|
||||||
|
@ -55,7 +55,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
@ -756,7 +755,7 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode,
|
|||||||
// file
|
// file
|
||||||
|
|
||||||
// read blocks
|
// read blocks
|
||||||
Block[] blocks = new BlockInfoContiguous[numBlocks];
|
BlockInfo[] blocks = new BlockInfoContiguous[numBlocks];
|
||||||
for (int j = 0; j < numBlocks; j++) {
|
for (int j = 0; j < numBlocks; j++) {
|
||||||
blocks[j] = new BlockInfoContiguous(replication);
|
blocks[j] = new BlockInfoContiguous(replication);
|
||||||
blocks[j].readFields(in);
|
blocks[j].readFields(in);
|
||||||
@ -778,9 +777,9 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode,
|
|||||||
clientMachine = FSImageSerialization.readString(in);
|
clientMachine = FSImageSerialization.readString(in);
|
||||||
// convert the last block to BlockUC
|
// convert the last block to BlockUC
|
||||||
if (blocks.length > 0) {
|
if (blocks.length > 0) {
|
||||||
Block lastBlk = blocks[blocks.length - 1];
|
BlockInfo lastBlk = blocks[blocks.length - 1];
|
||||||
blocks[blocks.length - 1] =
|
lastBlk.convertToBlockUnderConstruction(
|
||||||
new BlockInfoContiguousUnderConstruction(lastBlk, replication);
|
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,10 +45,9 @@
|
|||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
|
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
|
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
|
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
|
||||||
@ -378,11 +377,13 @@ private INodeFile loadINodeFile(INodeSection.INode n) {
|
|||||||
final BlockInfo ucBlk;
|
final BlockInfo ucBlk;
|
||||||
if (isStriped) {
|
if (isStriped) {
|
||||||
BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
|
BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
|
||||||
ucBlk = new BlockInfoStripedUnderConstruction(striped, ecPolicy);
|
ucBlk = new BlockInfoStriped(striped, ecPolicy);
|
||||||
} else {
|
} else {
|
||||||
ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk,
|
ucBlk = new BlockInfoContiguous(lastBlk,
|
||||||
replication);
|
replication);
|
||||||
}
|
}
|
||||||
|
ucBlk.convertToBlockUnderConstruction(
|
||||||
|
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
|
||||||
file.setBlock(file.numBlocks() - 1, ucBlk);
|
file.setBlock(file.numBlocks() - 1, ucBlk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
|
||||||
@ -138,8 +137,9 @@ static INodeFile readINodeUnderConstruction(
|
|||||||
// last block is UNDER_CONSTRUCTION
|
// last block is UNDER_CONSTRUCTION
|
||||||
if(numBlocks > 0) {
|
if(numBlocks > 0) {
|
||||||
blk.readFields(in);
|
blk.readFields(in);
|
||||||
blocksContiguous[i] = new BlockInfoContiguousUnderConstruction(
|
blocksContiguous[i] = new BlockInfoContiguous(blk, blockReplication);
|
||||||
blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
|
blocksContiguous[i].convertToBlockUnderConstruction(
|
||||||
|
BlockUCState.UNDER_CONSTRUCTION, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
PermissionStatus perm = PermissionStatus.read(in);
|
PermissionStatus perm = PermissionStatus.read(in);
|
||||||
|
@ -142,7 +142,6 @@
|
|||||||
import org.apache.hadoop.fs.CacheFlag;
|
import org.apache.hadoop.fs.CacheFlag;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FsServerDefaults;
|
import org.apache.hadoop.fs.FsServerDefaults;
|
||||||
@ -204,10 +203,9 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
|
||||||
@ -3124,28 +3122,25 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
|||||||
throw new AlreadyBeingCreatedException(message);
|
throw new AlreadyBeingCreatedException(message);
|
||||||
case UNDER_CONSTRUCTION:
|
case UNDER_CONSTRUCTION:
|
||||||
case UNDER_RECOVERY:
|
case UNDER_RECOVERY:
|
||||||
// TODO support truncate of striped blocks
|
final BlockUnderConstructionFeature uc = lastBlock.getUnderConstructionFeature();
|
||||||
final BlockInfoUnderConstruction uc =
|
|
||||||
(BlockInfoUnderConstruction)lastBlock;
|
|
||||||
// determine if last block was intended to be truncated
|
// determine if last block was intended to be truncated
|
||||||
Block recoveryBlock = uc.getTruncateBlock();
|
Block recoveryBlock = uc.getTruncateBlock();
|
||||||
boolean truncateRecovery = recoveryBlock != null;
|
boolean truncateRecovery = recoveryBlock != null;
|
||||||
boolean copyOnTruncate = truncateRecovery &&
|
boolean copyOnTruncate = truncateRecovery &&
|
||||||
recoveryBlock.getBlockId() != uc.toBlock().getBlockId();
|
recoveryBlock.getBlockId() != lastBlock.getBlockId();
|
||||||
assert !copyOnTruncate ||
|
assert !copyOnTruncate ||
|
||||||
recoveryBlock.getBlockId() < uc.toBlock().getBlockId() &&
|
recoveryBlock.getBlockId() < lastBlock.getBlockId() &&
|
||||||
recoveryBlock.getGenerationStamp() < uc.toBlock().
|
recoveryBlock.getGenerationStamp() < lastBlock.getGenerationStamp() &&
|
||||||
getGenerationStamp() &&
|
recoveryBlock.getNumBytes() > lastBlock.getNumBytes() :
|
||||||
recoveryBlock.getNumBytes() > uc.toBlock().getNumBytes() :
|
|
||||||
"wrong recoveryBlock";
|
"wrong recoveryBlock";
|
||||||
|
|
||||||
// setup the last block locations from the blockManager if not known
|
// setup the last block locations from the blockManager if not known
|
||||||
if (uc.getNumExpectedLocations() == 0) {
|
if (uc.getNumExpectedLocations() == 0) {
|
||||||
uc.setExpectedLocations(blockManager.getStorages(lastBlock));
|
uc.setExpectedLocations(lastBlock, blockManager.getStorages(lastBlock),
|
||||||
|
lastBlock.isStriped());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (uc.getNumExpectedLocations() == 0 &&
|
if (uc.getNumExpectedLocations() == 0 && lastBlock.getNumBytes() == 0) {
|
||||||
uc.toBlock().getNumBytes() == 0) {
|
|
||||||
// There is no datanode reported to this block.
|
// There is no datanode reported to this block.
|
||||||
// may be client have crashed before writing data to pipeline.
|
// may be client have crashed before writing data to pipeline.
|
||||||
// This blocks doesn't need any recovery.
|
// This blocks doesn't need any recovery.
|
||||||
@ -3159,14 +3154,14 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
|||||||
}
|
}
|
||||||
// start recovery of the last block for this file
|
// start recovery of the last block for this file
|
||||||
long blockRecoveryId =
|
long blockRecoveryId =
|
||||||
nextGenerationStamp(blockIdManager.isLegacyBlock(uc.toBlock()));
|
nextGenerationStamp(blockIdManager.isLegacyBlock(lastBlock));
|
||||||
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
||||||
if(copyOnTruncate) {
|
if(copyOnTruncate) {
|
||||||
uc.toBlock().setGenerationStamp(blockRecoveryId);
|
lastBlock.setGenerationStamp(blockRecoveryId);
|
||||||
} else if(truncateRecovery) {
|
} else if(truncateRecovery) {
|
||||||
recoveryBlock.setGenerationStamp(blockRecoveryId);
|
recoveryBlock.setGenerationStamp(blockRecoveryId);
|
||||||
}
|
}
|
||||||
uc.initializeBlockRecovery(blockRecoveryId);
|
uc.initializeBlockRecovery(lastBlock, blockRecoveryId);
|
||||||
leaseManager.renewLease(lease);
|
leaseManager.renewLease(lease);
|
||||||
// Cannot close file right now, since the last block requires recovery.
|
// Cannot close file right now, since the last block requires recovery.
|
||||||
// This may potentially cause infinite loop in lease recovery
|
// This may potentially cause infinite loop in lease recovery
|
||||||
@ -3371,8 +3366,8 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
|||||||
}
|
}
|
||||||
|
|
||||||
truncatedBlock = iFile.getLastBlock();
|
truncatedBlock = iFile.getLastBlock();
|
||||||
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)truncatedBlock;
|
final long recoveryId = truncatedBlock.getUnderConstructionFeature()
|
||||||
final long recoveryId = uc.getBlockRecoveryId();
|
.getBlockRecoveryId();
|
||||||
copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId();
|
copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId();
|
||||||
if(recoveryId != newgenerationstamp) {
|
if(recoveryId != newgenerationstamp) {
|
||||||
throw new IOException("The recovery id " + newgenerationstamp
|
throw new IOException("The recovery id " + newgenerationstamp
|
||||||
@ -5424,7 +5419,7 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
|
|||||||
// check the vadility of the block and lease holder name
|
// check the vadility of the block and lease holder name
|
||||||
final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
|
final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
|
||||||
final BlockInfo lastBlock = pendingFile.getLastBlock();
|
final BlockInfo lastBlock = pendingFile.getLastBlock();
|
||||||
final BlockInfoUnderConstruction blockinfo = (BlockInfoUnderConstruction)lastBlock;
|
assert !lastBlock.isComplete();
|
||||||
|
|
||||||
// check new GS & length: this is not expected
|
// check new GS & length: this is not expected
|
||||||
if (newBlock.getGenerationStamp() <= lastBlock.getGenerationStamp()) {
|
if (newBlock.getGenerationStamp() <= lastBlock.getGenerationStamp()) {
|
||||||
@ -5444,12 +5439,13 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
|
|||||||
|
|
||||||
// Update old block with the new generation stamp and new length
|
// Update old block with the new generation stamp and new length
|
||||||
lastBlock.setNumBytes(newBlock.getNumBytes());
|
lastBlock.setNumBytes(newBlock.getNumBytes());
|
||||||
blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
|
lastBlock.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
|
||||||
|
|
||||||
// find the DatanodeDescriptor objects
|
// find the DatanodeDescriptor objects
|
||||||
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
|
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
|
||||||
.getDatanodeStorageInfos(newNodes, newStorageIDs);
|
.getDatanodeStorageInfos(newNodes, newStorageIDs);
|
||||||
blockinfo.setExpectedLocations(storages);
|
lastBlock.getUnderConstructionFeature().setExpectedLocations(lastBlock,
|
||||||
|
storages, lastBlock.isStriped());
|
||||||
|
|
||||||
String src = pendingFile.getFullPathName();
|
String src = pendingFile.getFullPathName();
|
||||||
FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
|
FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
|
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
|
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.NO_SNAPSHOT_ID;
|
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.NO_SNAPSHOT_ID;
|
||||||
|
|
||||||
@ -39,12 +38,10 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
|
||||||
@ -86,7 +83,7 @@ public static INodeFile valueOf(INode inode, String path, boolean acceptNull)
|
|||||||
* [4-bit storagePolicyID][1-bit isStriped]
|
* [4-bit storagePolicyID][1-bit isStriped]
|
||||||
* [11-bit replication][48-bit preferredBlockSize]
|
* [11-bit replication][48-bit preferredBlockSize]
|
||||||
*/
|
*/
|
||||||
static enum HeaderFormat {
|
enum HeaderFormat {
|
||||||
PREFERRED_BLOCK_SIZE(null, 48, 1),
|
PREFERRED_BLOCK_SIZE(null, 48, 1),
|
||||||
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 11, 0),
|
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 11, 0),
|
||||||
IS_STRIPED(REPLICATION.BITS, 1, 0),
|
IS_STRIPED(REPLICATION.BITS, 1, 0),
|
||||||
@ -264,25 +261,20 @@ public void convertLastBlockToUC(BlockInfo lastBlock,
|
|||||||
if (numBlocks() == 0) {
|
if (numBlocks() == 0) {
|
||||||
throw new IOException("Failed to set last block: File is empty.");
|
throw new IOException("Failed to set last block: File is empty.");
|
||||||
}
|
}
|
||||||
|
lastBlock.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||||
|
locations);
|
||||||
|
}
|
||||||
|
|
||||||
final BlockInfo ucBlock;
|
void setLastBlock(BlockInfo blk) {
|
||||||
if (isStriped()) {
|
blk.setBlockCollection(this);
|
||||||
Preconditions.checkState(lastBlock.isStriped());
|
setBlock(numBlocks() - 1, blk);
|
||||||
ucBlock = ((BlockInfoStriped) lastBlock)
|
|
||||||
.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations);
|
|
||||||
} else {
|
|
||||||
Preconditions.checkState(!lastBlock.isStriped());
|
|
||||||
ucBlock = ((BlockInfoContiguous) lastBlock)
|
|
||||||
.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations);
|
|
||||||
}
|
|
||||||
setBlock(numBlocks() - 1, ucBlock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a block from the block list. This block should be
|
* Remove a block from the block list. This block should be
|
||||||
* the last one on the list.
|
* the last one on the list.
|
||||||
*/
|
*/
|
||||||
BlockInfoUnderConstruction removeLastBlock(Block oldblock) {
|
BlockInfo removeLastBlock(Block oldblock) {
|
||||||
Preconditions.checkState(isUnderConstruction(),
|
Preconditions.checkState(isUnderConstruction(),
|
||||||
"file is no longer under construction");
|
"file is no longer under construction");
|
||||||
if (blocks == null || blocks.length == 0) {
|
if (blocks == null || blocks.length == 0) {
|
||||||
@ -293,13 +285,12 @@ BlockInfoUnderConstruction removeLastBlock(Block oldblock) {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockInfoUnderConstruction uc =
|
BlockInfo ucBlock = blocks[size_1];
|
||||||
(BlockInfoUnderConstruction)blocks[size_1];
|
|
||||||
//copy to a new list
|
//copy to a new list
|
||||||
BlockInfo[] newlist = new BlockInfo[size_1];
|
BlockInfo[] newlist = new BlockInfo[size_1];
|
||||||
System.arraycopy(blocks, 0, newlist, 0, size_1);
|
System.arraycopy(blocks, 0, newlist, 0, size_1);
|
||||||
setBlocks(newlist);
|
setBlocks(newlist);
|
||||||
return uc;
|
return ucBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* End of Under-Construction Feature */
|
/* End of Under-Construction Feature */
|
||||||
@ -758,7 +749,7 @@ public final long computeFileSize(boolean includesLastUcBlock,
|
|||||||
//check if the last block is BlockInfoUnderConstruction
|
//check if the last block is BlockInfoUnderConstruction
|
||||||
BlockInfo lastBlk = blocks[last];
|
BlockInfo lastBlk = blocks[last];
|
||||||
long size = lastBlk.getNumBytes();
|
long size = lastBlk.getNumBytes();
|
||||||
if (lastBlk instanceof BlockInfoUnderConstruction) {
|
if (!lastBlk.isComplete()) {
|
||||||
if (!includesLastUcBlock) {
|
if (!includesLastUcBlock) {
|
||||||
size = 0;
|
size = 0;
|
||||||
} else if (usePreferredBlockSize4LastUcBlock) {
|
} else if (usePreferredBlockSize4LastUcBlock) {
|
||||||
|
@ -23,7 +23,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||||
@ -136,7 +135,7 @@ void combineAndCollectSnapshotBlocks(
|
|||||||
Block dontRemoveBlock = null;
|
Block dontRemoveBlock = null;
|
||||||
if (lastBlock != null && lastBlock.getBlockUCState().equals(
|
if (lastBlock != null && lastBlock.getBlockUCState().equals(
|
||||||
HdfsServerConstants.BlockUCState.UNDER_RECOVERY)) {
|
HdfsServerConstants.BlockUCState.UNDER_RECOVERY)) {
|
||||||
dontRemoveBlock = ((BlockInfoContiguousUnderConstruction) lastBlock)
|
dontRemoveBlock = lastBlock.getUnderConstructionFeature()
|
||||||
.getTruncateBlock();
|
.getTruncateBlock();
|
||||||
}
|
}
|
||||||
// Collect the remaining blocks of the file, ignoring truncate block
|
// Collect the remaining blocks of the file, ignoring truncate block
|
||||||
|
@ -117,7 +117,6 @@
|
|||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
@ -1650,13 +1649,11 @@ public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
|
|||||||
BlockManager bm0 = nn.getNamesystem().getBlockManager();
|
BlockManager bm0 = nn.getNamesystem().getBlockManager();
|
||||||
BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
|
BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
|
||||||
assertTrue("Block " + blk + " should be under construction, " +
|
assertTrue("Block " + blk + " should be under construction, " +
|
||||||
"got: " + storedBlock,
|
"got: " + storedBlock, !storedBlock.isComplete());
|
||||||
storedBlock instanceof BlockInfoContiguousUnderConstruction);
|
|
||||||
BlockInfoContiguousUnderConstruction ucBlock =
|
|
||||||
(BlockInfoContiguousUnderConstruction)storedBlock;
|
|
||||||
// We expect that the replica with the most recent heart beat will be
|
// We expect that the replica with the most recent heart beat will be
|
||||||
// the one to be in charge of the synchronization / recovery protocol.
|
// the one to be in charge of the synchronization / recovery protocol.
|
||||||
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
|
final DatanodeStorageInfo[] storages = storedBlock
|
||||||
|
.getUnderConstructionFeature().getExpectedStorageLocations();
|
||||||
DatanodeStorageInfo expectedPrimary = storages[0];
|
DatanodeStorageInfo expectedPrimary = storages[0];
|
||||||
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor()
|
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor()
|
||||||
.getLastUpdateMonotonic();
|
.getLastUpdateMonotonic();
|
||||||
|
@ -39,25 +39,24 @@ public void testInitializeBlockRecovery() throws Exception {
|
|||||||
DatanodeDescriptor dd3 = s3.getDatanodeDescriptor();
|
DatanodeDescriptor dd3 = s3.getDatanodeDescriptor();
|
||||||
|
|
||||||
dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
|
dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
|
||||||
BlockInfoContiguousUnderConstruction blockInfo = new BlockInfoContiguousUnderConstruction(
|
BlockInfoContiguous blockInfo = new BlockInfoContiguous(
|
||||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP),
|
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
|
||||||
(short) 3,
|
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||||
BlockUCState.UNDER_CONSTRUCTION,
|
|
||||||
new DatanodeStorageInfo[] {s1, s2, s3});
|
new DatanodeStorageInfo[] {s1, s2, s3});
|
||||||
|
|
||||||
// Recovery attempt #1.
|
// Recovery attempt #1.
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -3 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -3 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
|
||||||
blockInfo.initializeBlockRecovery(1);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 1);
|
||||||
BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
|
BlockInfo[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
|
||||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||||
|
|
||||||
// Recovery attempt #2.
|
// Recovery attempt #2.
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
||||||
blockInfo.initializeBlockRecovery(2);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 2);
|
||||||
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
|
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
|
||||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||||
|
|
||||||
@ -65,7 +64,7 @@ public void testInitializeBlockRecovery() throws Exception {
|
|||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
||||||
blockInfo.initializeBlockRecovery(3);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 3);
|
||||||
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
||||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||||
|
|
||||||
@ -74,7 +73,7 @@ public void testInitializeBlockRecovery() throws Exception {
|
|||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
||||||
blockInfo.initializeBlockRecovery(3);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 3);
|
||||||
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
||||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
@ -727,8 +728,8 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
|
|||||||
// verify the storage info is correct
|
// verify the storage info is correct
|
||||||
assertTrue(bm.getStoredBlock(new Block(receivedBlockId)).findStorageInfo
|
assertTrue(bm.getStoredBlock(new Block(receivedBlockId)).findStorageInfo
|
||||||
(ds) >= 0);
|
(ds) >= 0);
|
||||||
assertTrue(((BlockInfoContiguousUnderConstruction) bm.
|
assertTrue(bm.getStoredBlock(new Block(receivingBlockId))
|
||||||
getStoredBlock(new Block(receivingBlockId))).getNumExpectedLocations() > 0);
|
.getUnderConstructionFeature().getNumExpectedLocations() > 0);
|
||||||
assertTrue(bm.getStoredBlock(new Block(receivingReceivedBlockId))
|
assertTrue(bm.getStoredBlock(new Block(receivingReceivedBlockId))
|
||||||
.findStorageInfo(ds) >= 0);
|
.findStorageInfo(ds) >= 0);
|
||||||
assertNull(bm.getStoredBlock(new Block(ReceivedDeletedBlockId)));
|
assertNull(bm.getStoredBlock(new Block(ReceivedDeletedBlockId)));
|
||||||
@ -748,8 +749,8 @@ private BlockInfo addBlockToBM(long blkId) {
|
|||||||
|
|
||||||
private BlockInfo addUcBlockToBM(long blkId) {
|
private BlockInfo addUcBlockToBM(long blkId) {
|
||||||
Block block = new Block(blkId);
|
Block block = new Block(blkId);
|
||||||
BlockInfoContiguousUnderConstruction blockInfo =
|
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
|
||||||
new BlockInfoContiguousUnderConstruction(block, (short) 3);
|
blockInfo.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, null);
|
||||||
BlockCollection bc = Mockito.mock(BlockCollection.class);
|
BlockCollection bc = Mockito.mock(BlockCollection.class);
|
||||||
Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
|
Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
|
||||||
bm.blocksMap.addBlockCollection(blockInfo, bc);
|
bm.blocksMap.addBlockCollection(blockInfo, bc);
|
||||||
|
@ -172,9 +172,10 @@ public void testHeartbeatBlockRecovery() throws Exception {
|
|||||||
dd1.getStorageInfos()[0],
|
dd1.getStorageInfos()[0],
|
||||||
dd2.getStorageInfos()[0],
|
dd2.getStorageInfos()[0],
|
||||||
dd3.getStorageInfos()[0]};
|
dd3.getStorageInfos()[0]};
|
||||||
BlockInfoContiguousUnderConstruction blockInfo = new BlockInfoContiguousUnderConstruction(
|
BlockInfo blockInfo = new BlockInfoContiguous(
|
||||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
|
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
|
||||||
BlockUCState.UNDER_RECOVERY, storages);
|
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
|
||||||
|
storages);
|
||||||
dd1.addBlockToBeRecovered(blockInfo);
|
dd1.addBlockToBeRecovered(blockInfo);
|
||||||
DatanodeCommand[] cmds =
|
DatanodeCommand[] cmds =
|
||||||
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
||||||
@ -194,9 +195,10 @@ public void testHeartbeatBlockRecovery() throws Exception {
|
|||||||
// More than the default stale interval of 30 seconds.
|
// More than the default stale interval of 30 seconds.
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -40 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -40 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
||||||
blockInfo = new BlockInfoContiguousUnderConstruction(
|
blockInfo = new BlockInfoContiguous(
|
||||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
|
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
|
||||||
BlockUCState.UNDER_RECOVERY, storages);
|
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
|
||||||
|
storages);
|
||||||
dd1.addBlockToBeRecovered(blockInfo);
|
dd1.addBlockToBeRecovered(blockInfo);
|
||||||
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
||||||
assertEquals(1, cmds.length);
|
assertEquals(1, cmds.length);
|
||||||
@ -215,9 +217,10 @@ public void testHeartbeatBlockRecovery() throws Exception {
|
|||||||
// More than the default stale interval of 30 seconds.
|
// More than the default stale interval of 30 seconds.
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, - 40 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, - 40 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, - 80 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, - 80 * 1000);
|
||||||
blockInfo = new BlockInfoContiguousUnderConstruction(
|
blockInfo = new BlockInfoContiguous(
|
||||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
|
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
|
||||||
BlockUCState.UNDER_RECOVERY, storages);
|
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
|
||||||
|
storages);
|
||||||
dd1.addBlockToBeRecovered(blockInfo);
|
dd1.addBlockToBeRecovered(blockInfo);
|
||||||
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
|
||||||
assertEquals(1, cmds.length);
|
assertEquals(1, cmds.length);
|
||||||
|
@ -1182,7 +1182,8 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
// block under construction, the BlockManager will realize the expected
|
// block under construction, the BlockManager will realize the expected
|
||||||
// replication has been achieved and remove it from the under-replicated
|
// replication has been achieved and remove it from the under-replicated
|
||||||
// queue.
|
// queue.
|
||||||
BlockInfoContiguousUnderConstruction info = new BlockInfoContiguousUnderConstruction(block1, (short) 1);
|
BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
|
||||||
|
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, null);
|
||||||
BlockCollection bc = mock(BlockCollection.class);
|
BlockCollection bc = mock(BlockCollection.class);
|
||||||
when(bc.getPreferredBlockReplication()).thenReturn((short)1);
|
when(bc.getPreferredBlockReplication()).thenReturn((short)1);
|
||||||
bm.addBlockCollection(info, bc);
|
bm.addBlockCollection(info, bc);
|
||||||
@ -1247,9 +1248,8 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
|
|
||||||
DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
|
DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
|
||||||
dataNodes[0], new DatanodeStorage("s1"))};
|
dataNodes[0], new DatanodeStorage("s1"))};
|
||||||
final BlockInfoContiguousUnderConstruction ucBlock =
|
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
||||||
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
|
storageAry);
|
||||||
storageAry);
|
|
||||||
DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class);
|
DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class);
|
||||||
DatanodeDescriptor dn = mock(DatanodeDescriptor.class);
|
DatanodeDescriptor dn = mock(DatanodeDescriptor.class);
|
||||||
when(dn.isDecommissioned()).thenReturn(true);
|
when(dn.isDecommissioned()).thenReturn(true);
|
||||||
@ -1258,10 +1258,10 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
|
when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
|
||||||
when(storage.addBlock(any(BlockInfo.class))).thenReturn
|
when(storage.addBlock(any(BlockInfo.class))).thenReturn
|
||||||
(DatanodeStorageInfo.AddBlockResult.ADDED);
|
(DatanodeStorageInfo.AddBlockResult.ADDED);
|
||||||
ucBlock.addStorage(storage, ucBlock);
|
info.addStorage(storage, info);
|
||||||
|
|
||||||
BlockInfo lastBlk = mbc.getLastBlock();
|
BlockInfo lastBlk = mbc.getLastBlock();
|
||||||
when(mbc.getLastBlock()).thenReturn(lastBlk, ucBlock);
|
when(mbc.getLastBlock()).thenReturn(lastBlk, info);
|
||||||
|
|
||||||
bm.convertLastBlockToUnderConstruction(mbc, 0L);
|
bm.convertLastBlockToUnderConstruction(mbc, 0L);
|
||||||
|
|
||||||
|
@ -34,7 +34,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
@ -170,13 +169,13 @@ private void checkStripedBlockUC(BlockInfoStriped block,
|
|||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
block.getBlockId() & HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
|
block.getBlockId() & HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
|
||||||
|
|
||||||
final BlockInfoStripedUnderConstruction blockUC =
|
|
||||||
(BlockInfoStripedUnderConstruction) block;
|
|
||||||
Assert.assertEquals(HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
|
Assert.assertEquals(HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
|
||||||
blockUC.getBlockUCState());
|
block.getBlockUCState());
|
||||||
if (checkReplica) {
|
if (checkReplica) {
|
||||||
Assert.assertEquals(GROUP_SIZE, blockUC.getNumExpectedLocations());
|
Assert.assertEquals(GROUP_SIZE,
|
||||||
DatanodeStorageInfo[] storages = blockUC.getExpectedStorageLocations();
|
block.getUnderConstructionFeature().getNumExpectedLocations());
|
||||||
|
DatanodeStorageInfo[] storages = block.getUnderConstructionFeature()
|
||||||
|
.getExpectedStorageLocations();
|
||||||
for (DataNode dn : cluster.getDataNodes()) {
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
Assert.assertTrue(includeDataNode(dn.getDatanodeId(), storages));
|
Assert.assertTrue(includeDataNode(dn.getDatanodeId(), storages));
|
||||||
}
|
}
|
||||||
@ -205,11 +204,10 @@ public void testGetLocatedStripedBlocks() throws Exception {
|
|||||||
|
|
||||||
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
|
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
|
||||||
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
|
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
|
||||||
BlockInfoStripedUnderConstruction lastBlk =
|
BlockInfoStriped lastBlk = (BlockInfoStriped) fileNode.getLastBlock();
|
||||||
(BlockInfoStripedUnderConstruction) fileNode.getLastBlock();
|
DatanodeInfo[] expectedDNs = DatanodeStorageInfo.toDatanodeInfos(
|
||||||
DatanodeInfo[] expectedDNs = DatanodeStorageInfo
|
lastBlk.getUnderConstructionFeature().getExpectedStorageLocations());
|
||||||
.toDatanodeInfos(lastBlk.getExpectedStorageLocations());
|
int[] indices = lastBlk.getUnderConstructionFeature().getBlockIndices();
|
||||||
int[] indices = lastBlk.getBlockIndices();
|
|
||||||
|
|
||||||
LocatedBlocks blks = dfs.getClient().getLocatedBlocks(file.toString(), 0L);
|
LocatedBlocks blks = dfs.getClient().getLocatedBlocks(file.toString(), 0L);
|
||||||
Assert.assertEquals(1, blks.locatedBlockCount());
|
Assert.assertEquals(1, blks.locatedBlockCount());
|
||||||
@ -246,11 +244,10 @@ public void testAddUCReplica() throws Exception {
|
|||||||
cluster.getNamesystem().getAdditionalBlock(file.toString(),
|
cluster.getNamesystem().getAdditionalBlock(file.toString(),
|
||||||
fileNode.getId(), dfs.getClient().getClientName(), null, null, null);
|
fileNode.getId(), dfs.getClient().getClientName(), null, null, null);
|
||||||
BlockInfo lastBlock = fileNode.getLastBlock();
|
BlockInfo lastBlock = fileNode.getLastBlock();
|
||||||
BlockInfoStripedUnderConstruction ucBlock =
|
|
||||||
(BlockInfoStripedUnderConstruction) lastBlock;
|
|
||||||
|
|
||||||
DatanodeStorageInfo[] locs = ucBlock.getExpectedStorageLocations();
|
DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()
|
||||||
int[] indices = ucBlock.getBlockIndices();
|
.getExpectedStorageLocations();
|
||||||
|
int[] indices = lastBlock.getUnderConstructionFeature().getBlockIndices();
|
||||||
Assert.assertEquals(GROUP_SIZE, locs.length);
|
Assert.assertEquals(GROUP_SIZE, locs.length);
|
||||||
Assert.assertEquals(GROUP_SIZE, indices.length);
|
Assert.assertEquals(GROUP_SIZE, indices.length);
|
||||||
|
|
||||||
@ -272,8 +269,8 @@ public void testAddUCReplica() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// make sure lastBlock is correct and the storages have been updated
|
// make sure lastBlock is correct and the storages have been updated
|
||||||
locs = ucBlock.getExpectedStorageLocations();
|
locs = lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
|
||||||
indices = ucBlock.getBlockIndices();
|
indices = lastBlock.getUnderConstructionFeature().getBlockIndices();
|
||||||
Assert.assertEquals(GROUP_SIZE, locs.length);
|
Assert.assertEquals(GROUP_SIZE, locs.length);
|
||||||
Assert.assertEquals(GROUP_SIZE, indices.length);
|
Assert.assertEquals(GROUP_SIZE, indices.length);
|
||||||
for (DatanodeStorageInfo newstorage : locs) {
|
for (DatanodeStorageInfo newstorage : locs) {
|
||||||
@ -307,10 +304,9 @@ public void testAddUCReplica() throws Exception {
|
|||||||
bpId, reports, null);
|
bpId, reports, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockInfoStripedUnderConstruction ucBlock =
|
DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()
|
||||||
(BlockInfoStripedUnderConstruction) lastBlock;
|
.getExpectedStorageLocations();
|
||||||
DatanodeStorageInfo[] locs = ucBlock.getExpectedStorageLocations();
|
int[] indices = lastBlock.getUnderConstructionFeature().getBlockIndices();
|
||||||
int[] indices = ucBlock.getBlockIndices();
|
|
||||||
Assert.assertEquals(GROUP_SIZE, locs.length);
|
Assert.assertEquals(GROUP_SIZE, locs.length);
|
||||||
Assert.assertEquals(GROUP_SIZE, indices.length);
|
Assert.assertEquals(GROUP_SIZE, indices.length);
|
||||||
for (i = 0; i < GROUP_SIZE; i++) {
|
for (i = 0; i < GROUP_SIZE; i++) {
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -36,7 +37,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
@ -156,6 +157,7 @@ public void testBlockCreation() throws IOException {
|
|||||||
@Test
|
@Test
|
||||||
public void testGetBlockLocations() throws IOException {
|
public void testGetBlockLocations() throws IOException {
|
||||||
final NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
final NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
||||||
|
final BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
||||||
final Path p = new Path(BASE_DIR, "file2.dat");
|
final Path p = new Path(BASE_DIR, "file2.dat");
|
||||||
final String src = p.toString();
|
final String src = p.toString();
|
||||||
final FSDataOutputStream out = TestFileCreation.createFile(hdfs, p, 3);
|
final FSDataOutputStream out = TestFileCreation.createFile(hdfs, p, 3);
|
||||||
@ -170,7 +172,7 @@ public void testGetBlockLocations() throws IOException {
|
|||||||
final List<LocatedBlock> blocks = lb.getLocatedBlocks();
|
final List<LocatedBlock> blocks = lb.getLocatedBlocks();
|
||||||
assertEquals(i, blocks.size());
|
assertEquals(i, blocks.size());
|
||||||
final Block b = blocks.get(blocks.size() - 1).getBlock().getLocalBlock();
|
final Block b = blocks.get(blocks.size() - 1).getBlock().getLocalBlock();
|
||||||
assertTrue(b instanceof BlockInfoContiguousUnderConstruction);
|
assertFalse(blockManager.getStoredBlock(b).isComplete());
|
||||||
|
|
||||||
if (++i < NUM_BLOCKS) {
|
if (++i < NUM_BLOCKS) {
|
||||||
// write one more block
|
// write one more block
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -68,11 +67,13 @@ private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
|
|||||||
namesystem.dir.getINodeMap().put(file);
|
namesystem.dir.getINodeMap().put(file);
|
||||||
|
|
||||||
FSNamesystem namesystemSpy = spy(namesystem);
|
FSNamesystem namesystemSpy = spy(namesystem);
|
||||||
BlockInfoContiguousUnderConstruction blockInfo = new BlockInfoContiguousUnderConstruction(
|
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 1);
|
||||||
block, (short) 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
blockInfo.convertToBlockUnderConstruction(
|
||||||
|
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
|
||||||
blockInfo.setBlockCollection(file);
|
blockInfo.setBlockCollection(file);
|
||||||
blockInfo.setGenerationStamp(genStamp);
|
blockInfo.setGenerationStamp(genStamp);
|
||||||
blockInfo.initializeBlockRecovery(genStamp);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo,
|
||||||
|
genStamp);
|
||||||
doReturn(blockInfo).when(file).removeLastBlock(any(Block.class));
|
doReturn(blockInfo).when(file).removeLastBlock(any(Block.class));
|
||||||
doReturn(true).when(file).isUnderConstruction();
|
doReturn(true).when(file).isUnderConstruction();
|
||||||
doReturn(new BlockInfoContiguous[1]).when(file).getBlocks();
|
doReturn(new BlockInfoContiguous[1]).when(file).getBlocks();
|
||||||
|
@ -55,7 +55,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -1017,7 +1016,7 @@ public void testTruncateRecovery() throws IOException {
|
|||||||
is(fsn.getBlockIdManager().getGenerationStampV2()));
|
is(fsn.getBlockIdManager().getGenerationStampV2()));
|
||||||
assertThat(file.getLastBlock().getBlockUCState(),
|
assertThat(file.getLastBlock().getBlockUCState(),
|
||||||
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
|
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
|
||||||
long blockRecoveryId = ((BlockInfoContiguousUnderConstruction) file.getLastBlock())
|
long blockRecoveryId = file.getLastBlock().getUnderConstructionFeature()
|
||||||
.getBlockRecoveryId();
|
.getBlockRecoveryId();
|
||||||
assertThat(blockRecoveryId, is(initialGenStamp + 1));
|
assertThat(blockRecoveryId, is(initialGenStamp + 1));
|
||||||
fsn.getEditLog().logTruncate(
|
fsn.getEditLog().logTruncate(
|
||||||
@ -1051,7 +1050,7 @@ public void testTruncateRecovery() throws IOException {
|
|||||||
is(fsn.getBlockIdManager().getGenerationStampV2()));
|
is(fsn.getBlockIdManager().getGenerationStampV2()));
|
||||||
assertThat(file.getLastBlock().getBlockUCState(),
|
assertThat(file.getLastBlock().getBlockUCState(),
|
||||||
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
|
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
|
||||||
long blockRecoveryId = ((BlockInfoContiguousUnderConstruction) file.getLastBlock())
|
long blockRecoveryId = file.getLastBlock().getUnderConstructionFeature()
|
||||||
.getBlockRecoveryId();
|
.getBlockRecoveryId();
|
||||||
assertThat(blockRecoveryId, is(initialGenStamp + 1));
|
assertThat(blockRecoveryId, is(initialGenStamp + 1));
|
||||||
fsn.getEditLog().logTruncate(
|
fsn.getEditLog().logTruncate(
|
||||||
|
@ -39,7 +39,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
|
||||||
@ -157,8 +156,9 @@ public void testBlockStripedUCFileSize()
|
|||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
INodeFile inf = createStripedINodeFile();
|
INodeFile inf = createStripedINodeFile();
|
||||||
Block blk = new Block(1);
|
Block blk = new Block(1);
|
||||||
BlockInfoStripedUnderConstruction bInfoUCStriped
|
BlockInfoStriped bInfoUCStriped = new BlockInfoStriped(blk, testECPolicy);
|
||||||
= new BlockInfoStripedUnderConstruction(blk, testECPolicy);
|
bInfoUCStriped.convertToBlockUnderConstruction(
|
||||||
|
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
|
||||||
bInfoUCStriped.setNumBytes(100);
|
bInfoUCStriped.setNumBytes(100);
|
||||||
inf.addBlock(bInfoUCStriped);
|
inf.addBlock(bInfoUCStriped);
|
||||||
assertEquals(100, inf.computeFileSize());
|
assertEquals(100, inf.computeFileSize());
|
||||||
@ -191,8 +191,9 @@ public void testBlockStripedUCComputeQuotaUsage()
|
|||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
INodeFile inf = createStripedINodeFile();
|
INodeFile inf = createStripedINodeFile();
|
||||||
Block blk = new Block(1);
|
Block blk = new Block(1);
|
||||||
BlockInfoStripedUnderConstruction bInfoUCStriped
|
BlockInfoStriped bInfoUCStriped = new BlockInfoStriped(blk, testECPolicy);
|
||||||
= new BlockInfoStripedUnderConstruction(blk, testECPolicy);
|
bInfoUCStriped.convertToBlockUnderConstruction(
|
||||||
|
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
|
||||||
bInfoUCStriped.setNumBytes(100);
|
bInfoUCStriped.setNumBytes(100);
|
||||||
inf.addBlock(bInfoUCStriped);
|
inf.addBlock(bInfoUCStriped);
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||||
@ -752,12 +752,13 @@ void invoke() throws Exception {
|
|||||||
boolean checkNamenodeBeforeReturn() throws Exception {
|
boolean checkNamenodeBeforeReturn() throws Exception {
|
||||||
INodeFile fileNode = cluster.getNamesystem(0).getFSDirectory()
|
INodeFile fileNode = cluster.getNamesystem(0).getFSDirectory()
|
||||||
.getINode4Write(file).asFile();
|
.getINode4Write(file).asFile();
|
||||||
BlockInfoContiguousUnderConstruction blkUC =
|
BlockInfo blkUC = (fileNode.getBlocks())[1];
|
||||||
(BlockInfoContiguousUnderConstruction) (fileNode.getBlocks())[1];
|
int datanodeNum = blkUC.getUnderConstructionFeature()
|
||||||
int datanodeNum = blkUC.getExpectedStorageLocations().length;
|
.getExpectedStorageLocations().length;
|
||||||
for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) {
|
for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
datanodeNum = blkUC.getExpectedStorageLocations().length;
|
datanodeNum = blkUC.getUnderConstructionFeature()
|
||||||
|
.getExpectedStorageLocations().length;
|
||||||
}
|
}
|
||||||
return datanodeNum == 2;
|
return datanodeNum == 2;
|
||||||
}
|
}
|
||||||
|
@ -43,10 +43,9 @@
|
|||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
|
import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
@ -178,8 +177,7 @@ public static void checkSnapshotCreation(DistributedFileSystem hdfs,
|
|||||||
* Specific information for different types of INode:
|
* Specific information for different types of INode:
|
||||||
* {@link INodeDirectory}:childrenSize
|
* {@link INodeDirectory}:childrenSize
|
||||||
* {@link INodeFile}: fileSize, block list. Check {@link BlockInfo#toString()}
|
* {@link INodeFile}: fileSize, block list. Check {@link BlockInfo#toString()}
|
||||||
* and {@link BlockInfoContiguousUnderConstruction#toString()} for detailed information.
|
* and {@link BlockUnderConstructionFeature#toString()} for detailed information.
|
||||||
* {@link FileWithSnapshot}: next link
|
|
||||||
* </pre>
|
* </pre>
|
||||||
* @see INode#dumpTreeRecursively()
|
* @see INode#dumpTreeRecursively()
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user