HDFS-12395. Support erasure coding policy operations in namenode edit log. Contributed by Sammi Chen

This commit is contained in:
Kai Zheng 2017-09-15 09:43:39 +08:00
parent ae8f55b932
commit 08d996d3e9
15 changed files with 779 additions and 30 deletions

@ -208,10 +208,22 @@ final class FSDirErasureCodingOp {
return fsd.getAuditFileInfo(iip);
}
static ErasureCodingPolicy addErasureCodePolicy(final FSNamesystem fsn,
ErasureCodingPolicy policy) {
/**
* Add an erasure coding policy.
*
* @param fsn namespace
* @param policy the new policy to be added into system
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @throws IOException
*/
static ErasureCodingPolicy addErasureCodingPolicy(final FSNamesystem fsn,
ErasureCodingPolicy policy, final boolean logRetryCache) {
Preconditions.checkNotNull(policy);
return fsn.getErasureCodingPolicyManager().addPolicy(policy);
ErasureCodingPolicy retPolicy =
fsn.getErasureCodingPolicyManager().addPolicy(policy);
fsn.getEditLog().logAddErasureCodingPolicy(policy, logRetryCache);
return retPolicy;
}
/**
@ -219,24 +231,47 @@ final class FSDirErasureCodingOp {
*
* @param fsn namespace
* @param ecPolicyName the name of the policy to be removed
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @throws IOException
*/
static void removeErasureCodePolicy(final FSNamesystem fsn,
String ecPolicyName) throws IOException {
static void removeErasureCodingPolicy(final FSNamesystem fsn,
String ecPolicyName, final boolean logRetryCache) throws IOException {
Preconditions.checkNotNull(ecPolicyName);
fsn.getErasureCodingPolicyManager().removePolicy(ecPolicyName);
fsn.getEditLog().logRemoveErasureCodingPolicy(ecPolicyName, logRetryCache);
}
static void enableErasureCodePolicy(final FSNamesystem fsn,
String ecPolicyName) throws IOException {
/**
* Enable an erasure coding policy.
*
* @param fsn namespace
* @param ecPolicyName the name of the policy to be enabled
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @throws IOException
*/
static void enableErasureCodingPolicy(final FSNamesystem fsn,
String ecPolicyName, final boolean logRetryCache) throws IOException {
Preconditions.checkNotNull(ecPolicyName);
fsn.getErasureCodingPolicyManager().enablePolicy(ecPolicyName);
fsn.getEditLog().logEnableErasureCodingPolicy(ecPolicyName, logRetryCache);
}
static void disableErasureCodePolicy(final FSNamesystem fsn,
String ecPolicyName) throws IOException {
/**
* Disable an erasure coding policy.
*
* @param fsn namespace
* @param ecPolicyName the name of the policy to be disabled
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @throws IOException
*/
static void disableErasureCodingPolicy(final FSNamesystem fsn,
String ecPolicyName, final boolean logRetryCache) throws IOException {
Preconditions.checkNotNull(ecPolicyName);
fsn.getErasureCodingPolicyManager().disablePolicy(ecPolicyName);
fsn.getEditLog().logDisableErasureCodingPolicy(ecPolicyName, logRetryCache);
}
private static List<XAttr> removeErasureCodingPolicyXAttr(

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -97,6 +98,10 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddErasureCodingPolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.EnableErasureCodingPolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisableErasureCodingPolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveErasureCodingPolicyOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -1228,6 +1233,38 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
void logAddErasureCodingPolicy(ErasureCodingPolicy ecPolicy,
boolean toLogRpcIds) {
AddErasureCodingPolicyOp op =
AddErasureCodingPolicyOp.getInstance(cache.get());
op.setErasureCodingPolicy(ecPolicy);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logEnableErasureCodingPolicy(String ecPolicyName, boolean toLogRpcIds) {
EnableErasureCodingPolicyOp op =
EnableErasureCodingPolicyOp.getInstance(cache.get());
op.setErasureCodingPolicy(ecPolicyName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logDisableErasureCodingPolicy(String ecPolicyName, boolean toLogRpcIds) {
DisableErasureCodingPolicyOp op =
DisableErasureCodingPolicyOp.getInstance(cache.get());
op.setErasureCodingPolicy(ecPolicyName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRemoveErasureCodingPolicy(String ecPolicyName, boolean toLogRpcIds) {
RemoveErasureCodingPolicyOp op =
RemoveErasureCodingPolicyOp.getInstance(cache.get());
op.setErasureCodingPolicy(ecPolicyName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Get all the journals this edit log is currently operating on.
*/

@ -96,6 +96,14 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp
.AddErasureCodingPolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp
.RemoveErasureCodingPolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp
.EnableErasureCodingPolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp
.DisableErasureCodingPolicyOp;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@ -958,6 +966,41 @@ public class FSEditLogLoader {
setStoragePolicyOp.policyId);
break;
}
case OP_ADD_ERASURE_CODING_POLICY:
AddErasureCodingPolicyOp addOp = (AddErasureCodingPolicyOp) op;
fsNamesys.getErasureCodingPolicyManager().addPolicy(
addOp.getEcPolicy());
if (toAddRetryCache) {
fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId,
addOp.getEcPolicy());
}
break;
case OP_ENABLE_ERASURE_CODING_POLICY:
EnableErasureCodingPolicyOp enableOp = (EnableErasureCodingPolicyOp) op;
fsNamesys.getErasureCodingPolicyManager().enablePolicy(
enableOp.getEcPolicy());
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
break;
case OP_DISABLE_ERASURE_CODING_POLICY:
DisableErasureCodingPolicyOp disableOp =
(DisableErasureCodingPolicyOp) op;
fsNamesys.getErasureCodingPolicyManager().disablePolicy(
disableOp.getEcPolicy());
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
break;
case OP_REMOVE_ERASURE_CODING_POLICY:
RemoveErasureCodingPolicyOp removeOp = (RemoveErasureCodingPolicyOp) op;
fsNamesys.getErasureCodingPolicyManager().removePolicy(
removeOp.getEcPolicy());
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
break;
default:
throw new IOException("Invalid operation read " + op.opCode);
}

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_ERASURE_CODING_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_APPEND;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_BLOCK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
@ -31,7 +32,9 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISABLE_ERASURE_CODING_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ENABLE_ERASURE_CODING_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
@ -41,6 +44,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_ERASURE_CODING_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
@ -75,7 +79,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
@ -100,6 +106,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
@ -119,6 +126,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -4339,6 +4347,323 @@ public abstract class FSEditLogOp {
this.len = in.readLong();
}
}
/**
* Operation corresponding to add an erasure coding policy.
*/
static class AddErasureCodingPolicyOp extends FSEditLogOp {
private ErasureCodingPolicy ecPolicy;
AddErasureCodingPolicyOp() {
super(OP_ADD_ERASURE_CODING_POLICY);
}
static AddErasureCodingPolicyOp getInstance(OpInstanceCache cache) {
return (AddErasureCodingPolicyOp) cache
.get(OP_ADD_ERASURE_CODING_POLICY);
}
@Override
void resetSubFields() {
this.ecPolicy = null;
}
public ErasureCodingPolicy getEcPolicy() {
return this.ecPolicy;
}
public AddErasureCodingPolicyOp setErasureCodingPolicy(
ErasureCodingPolicy policy) {
Preconditions.checkNotNull(policy.getName());
Preconditions.checkNotNull(policy.getSchema());
Preconditions.checkArgument(policy.getCellSize() > 0);
this.ecPolicy = policy;
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
this.ecPolicy = FSImageSerialization.readErasureCodingPolicy(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
Preconditions.checkNotNull(ecPolicy);
FSImageSerialization.writeErasureCodingPolicy(out, ecPolicy);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
Preconditions.checkNotNull(ecPolicy);
XMLUtils.addSaxString(contentHandler, "CODEC", ecPolicy.getCodecName());
XMLUtils.addSaxString(contentHandler, "DATAUNITS",
Integer.toString(ecPolicy.getNumDataUnits()));
XMLUtils.addSaxString(contentHandler, "PARITYUNITS",
Integer.toString(ecPolicy.getNumParityUnits()));
XMLUtils.addSaxString(contentHandler, "CELLSIZE",
Integer.toString(ecPolicy.getCellSize()));
Map<String, String> extraOptions = ecPolicy.getSchema().getExtraOptions();
if (extraOptions == null || extraOptions.isEmpty()) {
XMLUtils.addSaxString(contentHandler, "EXTRAOPTIONS",
Integer.toString(0));
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
return;
}
XMLUtils.addSaxString(contentHandler, "EXTRAOPTIONS",
Integer.toString(extraOptions.size()));
for (Map.Entry<String, String> entry : extraOptions.entrySet()) {
contentHandler.startElement("", "", "EXTRAOPTION",
new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "KEY", entry.getKey());
XMLUtils.addSaxString(contentHandler, "VALUE", entry.getValue());
contentHandler.endElement("", "", "EXTRAOPTION");
}
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
final String codecName = st.getValue("CODEC");
final int dataUnits = Integer.parseInt(st.getValue("DATAUNITS"));
final int parityUnits = Integer.parseInt(st.getValue("PARITYUNITS"));
final int cellSize = Integer.parseInt(st.getValue("CELLSIZE"));
final int extraOptionNum = Integer.parseInt(st.getValue("EXTRAOPTIONS"));
ECSchema schema;
if (extraOptionNum == 0) {
schema = new ECSchema(codecName, dataUnits, parityUnits, null);
} else {
Map<String, String> extraOptions = new HashMap<String, String>();
List<Stanza> stanzas = st.getChildren("EXTRAOPTION");
for (Stanza a: stanzas) {
extraOptions.put(a.getValue("KEY"), a.getValue("VALUE"));
}
schema = new ECSchema(codecName, dataUnits, parityUnits, extraOptions);
}
this.ecPolicy = new ErasureCodingPolicy(schema, cellSize);
readRpcIdsFromXml(st);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AddErasureCodingPolicy [");
builder.append(ecPolicy.toString());
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* Operation corresponding to enable an erasure coding policy.
*/
static class EnableErasureCodingPolicyOp extends FSEditLogOp {
private String ecPolicyName;
EnableErasureCodingPolicyOp() {
super(OP_ENABLE_ERASURE_CODING_POLICY);
}
static EnableErasureCodingPolicyOp getInstance(OpInstanceCache cache) {
return (EnableErasureCodingPolicyOp) cache
.get(OP_ENABLE_ERASURE_CODING_POLICY);
}
@Override
void resetSubFields() {
this.ecPolicyName = null;
}
public String getEcPolicy() {
return this.ecPolicyName;
}
public EnableErasureCodingPolicyOp setErasureCodingPolicy(
String policyName) {
Preconditions.checkNotNull(policyName);
this.ecPolicyName = policyName;
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
this.ecPolicyName = FSImageSerialization.readString(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
Preconditions.checkNotNull(ecPolicyName);
FSImageSerialization.writeString(ecPolicyName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
Preconditions.checkNotNull(ecPolicyName);
XMLUtils.addSaxString(contentHandler, "POLICYNAME", this.ecPolicyName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.ecPolicyName = st.getValue("POLICYNAME");
readRpcIdsFromXml(st);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("EnableErasureCodingPolicy [");
builder.append(ecPolicyName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* Operation corresponding to disable an erasure coding policy.
*/
static class DisableErasureCodingPolicyOp extends FSEditLogOp {
private String ecPolicyName;
DisableErasureCodingPolicyOp() {
super(OP_DISABLE_ERASURE_CODING_POLICY);
}
static DisableErasureCodingPolicyOp getInstance(OpInstanceCache cache) {
return (DisableErasureCodingPolicyOp) cache
.get(OP_DISABLE_ERASURE_CODING_POLICY);
}
@Override
void resetSubFields() {
this.ecPolicyName = null;
}
public String getEcPolicy() {
return this.ecPolicyName;
}
public DisableErasureCodingPolicyOp setErasureCodingPolicy(
String policyName) {
Preconditions.checkNotNull(policyName);
this.ecPolicyName = policyName;
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
this.ecPolicyName = FSImageSerialization.readString(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(ecPolicyName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "POLICYNAME", this.ecPolicyName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.ecPolicyName = st.getValue("POLICYNAME");
readRpcIdsFromXml(st);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("DisableErasureCodingPolicy [");
builder.append(ecPolicyName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* Operation corresponding to remove an erasure coding policy.
*/
static class RemoveErasureCodingPolicyOp extends FSEditLogOp {
private String ecPolicyName;
RemoveErasureCodingPolicyOp() {
super(OP_REMOVE_ERASURE_CODING_POLICY);
}
static RemoveErasureCodingPolicyOp getInstance(OpInstanceCache cache) {
return (RemoveErasureCodingPolicyOp) cache
.get(OP_REMOVE_ERASURE_CODING_POLICY);
}
@Override
void resetSubFields() {
this.ecPolicyName = null;
}
public String getEcPolicy() {
return this.ecPolicyName;
}
public RemoveErasureCodingPolicyOp setErasureCodingPolicy(
String policyName) {
Preconditions.checkNotNull(policyName);
this.ecPolicyName = policyName;
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
this.ecPolicyName = FSImageSerialization.readString(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(ecPolicyName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "POLICYNAME", this.ecPolicyName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.ecPolicyName = st.getValue("POLICYNAME");
readRpcIdsFromXml(st);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("RemoveErasureCodingPolicy [");
builder.append(ecPolicyName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* Operation corresponding to upgrade
*/

@ -80,6 +80,11 @@ public enum FSEditLogOpCodes {
OP_TRUNCATE ((byte) 46, TruncateOp.class),
OP_APPEND ((byte) 47, AppendOp.class),
OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48, SetQuotaByStorageTypeOp.class),
OP_ADD_ERASURE_CODING_POLICY ((byte) 49, AddErasureCodingPolicyOp.class),
OP_ENABLE_ERASURE_CODING_POLICY((byte) 50, EnableErasureCodingPolicyOp.class),
OP_DISABLE_ERASURE_CODING_POLICY((byte) 51,
DisableErasureCodingPolicyOp.class),
OP_REMOVE_ERASURE_CODING_POLICY((byte) 52, RemoveErasureCodingPolicyOp.class),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);

@ -21,6 +21,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -33,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
@ -46,6 +49,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
@ -750,4 +754,45 @@ public class FSImageSerialization {
return info;
}
public static void writeErasureCodingPolicy(DataOutputStream out,
ErasureCodingPolicy ecPolicy) throws IOException {
writeString(ecPolicy.getSchema().getCodecName(), out);
writeInt(ecPolicy.getNumDataUnits(), out);
writeInt(ecPolicy.getNumParityUnits(), out);
writeInt(ecPolicy.getCellSize(), out);
Map<String, String> extraOptions = ecPolicy.getSchema().getExtraOptions();
if (extraOptions == null || extraOptions.isEmpty()) {
writeInt(0, out);
return;
}
writeInt(extraOptions.size(), out);
for (Map.Entry<String, String> entry : extraOptions.entrySet()) {
writeString(entry.getKey(), out);
writeString(entry.getValue(), out);
}
}
public static ErasureCodingPolicy readErasureCodingPolicy(DataInput in)
throws IOException {
String codecName = readString(in);
int numDataUnits = readInt(in);
int numParityUnits = readInt(in);
int cellSize = readInt(in);
int size = readInt(in);
Map<String, String> extraOptions = new HashMap<>(size);
if (size != 0) {
for (int i = 0; i < size; i++) {
String key = readString(in);
String value = readString(in);
extraOptions.put(key, value);
}
}
ECSchema ecSchema = new ECSchema(codecName, numDataUnits,
numParityUnits, extraOptions);
return new ErasureCodingPolicy(ecSchema, cellSize);
}
}

@ -7189,10 +7189,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Add multiple erasure coding policies to the ErasureCodingPolicyManager.
* @param policies The policies to add.
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @return The according result of add operation.
*/
AddECPolicyResponse[] addErasureCodingPolicies(ErasureCodingPolicy[] policies)
throws IOException {
AddECPolicyResponse[] addErasureCodingPolicies(ErasureCodingPolicy[] policies,
final boolean logRetryCache) throws IOException {
final String operationName = "addErasureCodingPolicies";
String addECPolicyName = "";
checkOperation(OperationCategory.WRITE);
@ -7201,12 +7203,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot add erasure coding policy");
for (ErasureCodingPolicy policy : policies) {
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot add erasure coding policy");
ErasureCodingPolicy newPolicy =
FSDirErasureCodingOp.addErasureCodePolicy(this, policy);
FSDirErasureCodingOp.addErasureCodingPolicy(this, policy,
logRetryCache);
addECPolicyName = newPolicy.getName();
responses.add(new AddECPolicyResponse(newPolicy));
} catch (HadoopIllegalArgumentException e) {
@ -7227,9 +7229,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Remove an erasure coding policy.
* @param ecPolicyName the name of the policy to be removed
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @throws IOException
*/
void removeErasureCodingPolicy(String ecPolicyName) throws IOException {
void removeErasureCodingPolicy(String ecPolicyName,
final boolean logRetryCache) throws IOException {
final String operationName = "removeErasureCodingPolicy";
checkOperation(OperationCategory.WRITE);
boolean success = false;
@ -7238,23 +7243,27 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove erasure coding policy "
+ ecPolicyName);
FSDirErasureCodingOp.removeErasureCodePolicy(this, ecPolicyName);
FSDirErasureCodingOp.removeErasureCodingPolicy(this, ecPolicyName,
logRetryCache);
success = true;
} finally {
writeUnlock(operationName);
if (success) {
getEditLog().logSync();
}
logAuditEvent(success, operationName, null, null, null);
logAuditEvent(success, operationName, ecPolicyName, null, null);
}
}
/**
* Enable an erasure coding policy.
* @param ecPolicyName the name of the policy to be enabled
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @throws IOException
*/
void enableErasureCodingPolicy(String ecPolicyName) throws IOException {
void enableErasureCodingPolicy(String ecPolicyName,
final boolean logRetryCache) throws IOException {
final String operationName = "enableErasureCodingPolicy";
checkOperation(OperationCategory.WRITE);
boolean success = false;
@ -7264,7 +7273,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot enable erasure coding policy "
+ ecPolicyName);
FSDirErasureCodingOp.enableErasureCodePolicy(this, ecPolicyName);
FSDirErasureCodingOp.enableErasureCodingPolicy(this, ecPolicyName,
logRetryCache);
success = true;
} finally {
writeUnlock(operationName);
@ -7278,9 +7288,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Disable an erasure coding policy.
* @param ecPolicyName the name of the policy to be disabled
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @throws IOException
*/
void disableErasureCodingPolicy(String ecPolicyName) throws IOException {
void disableErasureCodingPolicy(String ecPolicyName,
final boolean logRetryCache) throws IOException {
final String operationName = "disableErasureCodingPolicy";
checkOperation(OperationCategory.WRITE);
boolean success = false;
@ -7290,7 +7303,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot disable erasure coding policy "
+ ecPolicyName);
FSDirErasureCodingOp.disableErasureCodePolicy(this, ecPolicyName);
FSDirErasureCodingOp.disableErasureCodingPolicy(this, ecPolicyName,
logRetryCache);
success = true;
} finally {
writeUnlock(operationName);

@ -2337,7 +2337,21 @@ public class NameNodeRpcServer implements NamenodeProtocols {
ErasureCodingPolicy[] policies) throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return namesystem.addErasureCodingPolicies(policies);
final CacheEntryWithPayload cacheEntry =
RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (AddECPolicyResponse[]) cacheEntry.getPayload();
}
boolean success = false;
AddECPolicyResponse[] responses = new AddECPolicyResponse[0];
try {
responses =
namesystem.addErasureCodingPolicies(policies, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success, responses);
}
return responses;
}
@Override
@ -2345,7 +2359,17 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
namesystem.removeErasureCodingPolicy(ecPolicyName);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.removeErasureCodingPolicy(ecPolicyName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
@ -2353,7 +2377,17 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
namesystem.enableErasureCodingPolicy(ecPolicyName);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.enableErasureCodingPolicy(ecPolicyName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
@ -2361,7 +2395,17 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
namesystem.disableErasureCodingPolicy(ecPolicyName);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.disableErasureCodingPolicy(ecPolicyName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ReconfigurationProtocol

@ -108,6 +108,10 @@ The output result of this processor should be like the following output:
...some output omitted...
OP_APPEND ( 47): 1
OP_SET_QUOTA_BY_STORAGETYPE ( 48): 1
OP_ADD_ERASURE_CODING_POLICY ( 49): 0
OP_ENABLE_ERASURE_CODING_POLICY ( 50): 1
OP_DISABLE_ERASURE_CODING_POLICY ( 51): 0
OP_REMOVE_ERASURE_CODING_POLICY ( 52): 0
OP_INVALID ( -1): 0
The output is formatted as a colon separated two column table: OpCode and OpCodeCount. Each OpCode corresponding to the specific operation(s) in NameNode.

@ -108,6 +108,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
@ -163,6 +164,8 @@ import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
@ -1464,6 +1467,33 @@ public class DFSTestUtil {
new byte[]{0x37, 0x38, 0x39});
// OP_REMOVE_XATTR
filesystem.removeXAttr(pathConcatTarget, "user.a2");
// OP_ADD_ERASURE_CODING_POLICY
ErasureCodingPolicy newPolicy1 =
new ErasureCodingPolicy(ErasureCodeConstants.RS_3_2_SCHEMA, 8 * 1024);
ErasureCodingPolicy[] policyArray = new ErasureCodingPolicy[] {newPolicy1};
AddECPolicyResponse[] responses =
filesystem.addErasureCodingPolicies(policyArray);
newPolicy1 = responses[0].getPolicy();
// OP_ADD_ERASURE_CODING_POLICY - policy with extra options
Map<String, String> extraOptions = new HashMap<String, String>();
extraOptions.put("dummyKey", "dummyValue");
ECSchema schema =
new ECSchema(ErasureCodeConstants.RS_CODEC_NAME, 6, 10, extraOptions);
ErasureCodingPolicy newPolicy2 = new ErasureCodingPolicy(schema, 4 * 1024);
policyArray = new ErasureCodingPolicy[] {newPolicy2};
responses = filesystem.addErasureCodingPolicies(policyArray);
newPolicy2 = responses[0].getPolicy();
// OP_ENABLE_ERASURE_CODING_POLICY
filesystem.enableErasureCodingPolicy(newPolicy1.getName());
filesystem.enableErasureCodingPolicy(newPolicy2.getName());
// OP_DISABLE_ERASURE_CODING_POLICY
filesystem.disableErasureCodingPolicy(newPolicy1.getName());
filesystem.disableErasureCodingPolicy(newPolicy2.getName());
// OP_REMOVE_ERASURE_CODING_POLICY
filesystem.removeErasureCodingPolicy(newPolicy1.getName());
filesystem.removeErasureCodingPolicy(newPolicy2.getName());
}
public static void abortStream(DFSOutputStream out) throws IOException {

@ -72,7 +72,7 @@ public class TestDFSInotifyEventInputStream {
*/
@Test
public void testOpcodeCount() {
Assert.assertEquals(50, FSEditLogOpCodes.values().length);
Assert.assertEquals(54, FSEditLogOpCodes.values().length);
}

@ -47,8 +47,10 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.AddECPolicyResponse;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@ -57,6 +59,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Level;
@ -712,4 +715,84 @@ public class TestFSEditLogLoader {
}
}
}
@Test
public void testErasureCodingPolicyOperations() throws IOException {
// start a cluster
Configuration conf = new HdfsConfiguration();
final int blockSize = 16 * 1024;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
// 1. add new policy
ECSchema schema = new ECSchema("rs", 5, 3);
int cellSize = 2 * 1024;
ErasureCodingPolicy newPolicy =
new ErasureCodingPolicy(schema, cellSize, (byte) 0);
ErasureCodingPolicy[] policyArray = new ErasureCodingPolicy[]{newPolicy};
AddECPolicyResponse[] responses =
fs.addErasureCodingPolicies(policyArray);
assertEquals(1, responses.length);
assertTrue(responses[0].isSucceed());
newPolicy = responses[0].getPolicy();
// Restart NameNode without saving namespace
cluster.restartNameNodes();
cluster.waitActive();
// check if new policy is reapplied through edit log
ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId());
assertEquals(ErasureCodingPolicyState.DISABLED, ecPolicy.getState());
// 2. enable policy
fs.enableErasureCodingPolicy(newPolicy.getName());
cluster.restartNameNodes();
cluster.waitActive();
ecPolicy =
ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId());
assertEquals(ErasureCodingPolicyState.ENABLED, ecPolicy.getState());
// create a new file, use the policy
final Path dirPath = new Path("/striped");
final Path filePath = new Path(dirPath, "file");
final int fileLength = blockSize * newPolicy.getNumDataUnits();
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath, newPolicy.getName());
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, filePath, bytes);
// 3. disable policy
fs.disableErasureCodingPolicy(newPolicy.getName());
cluster.restartNameNodes();
cluster.waitActive();
ecPolicy =
ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId());
assertEquals(ErasureCodingPolicyState.DISABLED, ecPolicy.getState());
// read file
DFSTestUtil.readFileAsBytes(fs, filePath);
// 4. remove policy
fs.removeErasureCodingPolicy(newPolicy.getName());
cluster.restartNameNodes();
cluster.waitActive();
ecPolicy =
ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId());
assertEquals(ErasureCodingPolicyState.REMOVED, ecPolicy.getState());
// read file
DFSTestUtil.readFileAsBytes(fs, filePath);
cluster.shutdown();
cluster = null;
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

@ -182,7 +182,7 @@ public class TestOfflineEditsViewer {
hasAllOpCodes(editsStored));
assertTrue("Reference XML edits and parsed to XML should be same",
FileUtils.contentEqualsIgnoreEOL(new File(editsStoredXml),
new File(editsStoredParsedXml), "UTF-8"));
new File(editsStoredParsedXml), "UTF-8"));
assertTrue(
"Reference edits and reparsed (bin to XML to bin) should be same",
filesEqualIgnoreTrailingZeros(editsStored, editsStoredReparsed));

@ -1179,23 +1179,107 @@
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_START</OPCODE>
<OPCODE>OP_ADD_ERASURE_CODING_POLICY</OPCODE>
<DATA>
<TXID>89</TXID>
<CODEC>rs</CODEC>
<DATAUNITS>3</DATAUNITS>
<PARITYUNITS>2</PARITYUNITS>
<CELLSIZE>8192</CELLSIZE>
<EXTRAOPTIONS>0</EXTRAOPTIONS>
<RPC_CLIENTID>7334ec24-dd6b-4efd-807d-ed0d18625534</RPC_CLIENTID>
<RPC_CALLID>84</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_ERASURE_CODING_POLICY</OPCODE>
<DATA>
<TXID>90</TXID>
<CODEC>rs</CODEC>
<DATAUNITS>6</DATAUNITS>
<PARITYUNITS>10</PARITYUNITS>
<CELLSIZE>4096</CELLSIZE>
<EXTRAOPTIONS>1</EXTRAOPTIONS>
<EXTRAOPTION>
<KEY>dummyKey</KEY>
<VALUE>dummyValue</VALUE>
</EXTRAOPTION>
<RPC_CLIENTID>7334ec24-dd6b-4efd-807d-ed0d18625534</RPC_CLIENTID>
<RPC_CALLID>85</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ENABLE_ERASURE_CODING_POLICY</OPCODE>
<DATA>
<TXID>91</TXID>
<POLICYNAME>RS-3-2-8k</POLICYNAME>
<RPC_CLIENTID>7334ec24-dd6b-4efd-807d-ed0d18625534</RPC_CLIENTID>
<RPC_CALLID>86</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ENABLE_ERASURE_CODING_POLICY</OPCODE>
<DATA>
<TXID>92</TXID>
<POLICYNAME>RS-6-10-4k</POLICYNAME>
<RPC_CLIENTID>7334ec24-dd6b-4efd-807d-ed0d18625534</RPC_CLIENTID>
<RPC_CALLID>87</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_DISABLE_ERASURE_CODING_POLICY</OPCODE>
<DATA>
<TXID>93</TXID>
<POLICYNAME>RS-3-2-8k</POLICYNAME>
<RPC_CLIENTID>7334ec24-dd6b-4efd-807d-ed0d18625534</RPC_CLIENTID>
<RPC_CALLID>88</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_DISABLE_ERASURE_CODING_POLICY</OPCODE>
<DATA>
<TXID>94</TXID>
<POLICYNAME>RS-6-10-4k</POLICYNAME>
<RPC_CLIENTID>7334ec24-dd6b-4efd-807d-ed0d18625534</RPC_CLIENTID>
<RPC_CALLID>89</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REMOVE_ERASURE_CODING_POLICY</OPCODE>
<DATA>
<TXID>95</TXID>
<POLICYNAME>RS-3-2-8k</POLICYNAME>
<RPC_CLIENTID>7334ec24-dd6b-4efd-807d-ed0d18625534</RPC_CLIENTID>
<RPC_CALLID>90</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REMOVE_ERASURE_CODING_POLICY</OPCODE>
<DATA>
<TXID>96</TXID>
<POLICYNAME>RS-6-10-4k</POLICYNAME>
<RPC_CLIENTID>7334ec24-dd6b-4efd-807d-ed0d18625534</RPC_CLIENTID>
<RPC_CALLID>91</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_START</OPCODE>
<DATA>
<TXID>97</TXID>
<STARTTIME>1422406383706</STARTTIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_FINALIZE</OPCODE>
<DATA>
<TXID>90</TXID>
<TXID>98</TXID>
<FINALIZETIME>1422406383706</FINALIZETIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_END_LOG_SEGMENT</OPCODE>
<DATA>
<TXID>91</TXID>
<TXID>99</TXID>
</DATA>
</RECORD>
</EDITS>