HDFS-8090. Erasure Coding: Add RPC to client-namenode to list all ECSchemas loaded in Namenode. (Contributed by Vinayakumar B)
This commit is contained in:
parent
4970f2a2ab
commit
f53e402635
@ -123,12 +123,12 @@ private void initWith(String codecName, int numDataUnits, int numParityUnits,
|
||||
|
||||
this.chunkSize = DEFAULT_CHUNK_SIZE;
|
||||
try {
|
||||
if (options.containsKey(CHUNK_SIZE_KEY)) {
|
||||
if (this.options.containsKey(CHUNK_SIZE_KEY)) {
|
||||
this.chunkSize = Integer.parseInt(options.get(CHUNK_SIZE_KEY));
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Option value " +
|
||||
options.get(CHUNK_SIZE_KEY) + " for " + CHUNK_SIZE_KEY +
|
||||
this.options.get(CHUNK_SIZE_KEY) + " for " + CHUNK_SIZE_KEY +
|
||||
" is found. It should be an integer");
|
||||
}
|
||||
|
||||
|
@ -58,4 +58,7 @@
|
||||
|
||||
HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
|
||||
|
||||
HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
|
||||
HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
|
||||
|
||||
HDFS-8090. Erasure Coding: Add RPC to client-namenode to list all
|
||||
ECSchemas loaded in Namenode. (vinayakumarb)
|
@ -165,6 +165,7 @@
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
@ -3149,6 +3150,16 @@ public ECInfo getErasureCodingInfo(String src) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
public ECSchema[] getECSchemas() throws IOException {
|
||||
checkOpen();
|
||||
TraceScope scope = Trace.startSpan("getECSchemas", traceSampler);
|
||||
try {
|
||||
return namenode.getECSchemas();
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
|
||||
checkOpen();
|
||||
return new DFSInotifyEventInputStream(traceSampler, namenode);
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.retry.AtMostOnce;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
@ -1474,4 +1475,13 @@ public List<XAttr> listXAttrs(String src)
|
||||
*/
|
||||
@Idempotent
|
||||
public ECInfo getErasureCodingInfo(String src) throws IOException;
|
||||
|
||||
/**
|
||||
* Gets list of ECSchemas loaded in Namenode
|
||||
*
|
||||
* @return Returns the list of ECSchemas loaded at Namenode
|
||||
* @throws IOException
|
||||
*/
|
||||
@Idempotent
|
||||
public ECSchema[] getECSchemas() throws IOException;
|
||||
}
|
||||
|
@ -107,6 +107,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
|
||||
@ -218,6 +220,7 @@
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
|
||||
@ -1530,4 +1533,20 @@ public GetErasureCodingInfoResponseProto getErasureCodingInfo(RpcController cont
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetECSchemasResponseProto getECSchemas(RpcController controller,
|
||||
GetECSchemasRequestProto request) throws ServiceException {
|
||||
try {
|
||||
ECSchema[] ecSchemas = server.getECSchemas();
|
||||
GetECSchemasResponseProto.Builder resBuilder = GetECSchemasResponseProto
|
||||
.newBuilder();
|
||||
for (ECSchema ecSchema : ecSchemas) {
|
||||
resBuilder.addSchemas(PBHelper.convertECSchema(ecSchema));
|
||||
}
|
||||
return resBuilder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -107,6 +107,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
|
||||
@ -165,10 +167,11 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.*;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
|
||||
@ -180,6 +183,7 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||
@ -237,6 +241,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||
VOID_GET_STORAGE_POLICIES_REQUEST =
|
||||
GetStoragePoliciesRequestProto.newBuilder().build();
|
||||
|
||||
private final static GetECSchemasRequestProto
|
||||
VOID_GET_ECSCHEMAS_REQUEST = GetECSchemasRequestProto
|
||||
.newBuilder().build();
|
||||
|
||||
public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
|
||||
rpcProxy = proxy;
|
||||
}
|
||||
@ -1550,4 +1558,20 @@ public ECInfo getErasureCodingInfo(String src) throws IOException {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ECSchema[] getECSchemas() throws IOException {
|
||||
try {
|
||||
GetECSchemasResponseProto response = rpcProxy.getECSchemas(null,
|
||||
VOID_GET_ECSCHEMAS_REQUEST);
|
||||
ECSchema[] schemas = new ECSchema[response.getSchemasCount()];
|
||||
int i = 0;
|
||||
for (ECSchemaProto schemaProto : response.getSchemasList()) {
|
||||
schemas[i++] = PBHelper.convertECSchema(schemaProto);
|
||||
}
|
||||
return schemas;
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3121,8 +3121,6 @@ public static ECSchema convertECSchema(ECSchemaProto schema) {
|
||||
for (ECSchemaOptionEntryProto option : optionsList) {
|
||||
options.put(option.getKey(), option.getValue());
|
||||
}
|
||||
// include chunksize in options.
|
||||
options.put(ECSchema.CHUNK_SIZE_KEY, String.valueOf(schema.getChunkSize()));
|
||||
return new ECSchema(schema.getSchemaName(), schema.getCodecName(),
|
||||
schema.getDataUnits(), schema.getParityUnits(), options);
|
||||
}
|
||||
@ -3132,8 +3130,7 @@ public static ECSchemaProto convertECSchema(ECSchema schema) {
|
||||
.setSchemaName(schema.getSchemaName())
|
||||
.setCodecName(schema.getCodecName())
|
||||
.setDataUnits(schema.getNumDataUnits())
|
||||
.setParityUnits(schema.getNumParityUnits())
|
||||
.setChunkSize(schema.getChunkSize());
|
||||
.setParityUnits(schema.getNumParityUnits());
|
||||
Set<Entry<String, String>> entrySet = schema.getOptions().entrySet();
|
||||
for (Entry<String, String> entry : entrySet) {
|
||||
builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
|
||||
|
@ -7585,6 +7585,23 @@ ECInfo getErasureCodingInfo(String src) throws AccessControlException,
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get available ECSchemas
|
||||
*/
|
||||
ECSchema[] getECSchemas() throws IOException {
|
||||
checkOperation(OperationCategory.READ);
|
||||
waitForLoadingFSImage();
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
// TODO HDFS-7866 Need to return all schemas maintained by Namenode
|
||||
ECSchema defaultSchema = ECSchemaManager.getSystemDefaultSchema();
|
||||
return new ECSchema[] { defaultSchema };
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
|
||||
boolean logRetryCache)
|
||||
throws IOException {
|
||||
|
@ -143,6 +143,7 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RetryCache;
|
||||
@ -2031,9 +2032,15 @@ public void removeSpanReceiver(long id) throws IOException {
|
||||
nn.spanReceiverHost.removeSpanReceiver(id);
|
||||
}
|
||||
|
||||
@Override // ClientNameNodeProtocol
|
||||
@Override // ClientProtocol
|
||||
public ECInfo getErasureCodingInfo(String src) throws IOException {
|
||||
checkNNStartup();
|
||||
return namesystem.getErasureCodingInfo(src);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public ECSchema[] getECSchemas() throws IOException {
|
||||
checkNNStartup();
|
||||
return namesystem.getECSchemas();
|
||||
}
|
||||
}
|
||||
|
@ -729,6 +729,13 @@ message GetErasureCodingInfoResponseProto {
|
||||
optional ECInfoProto ECInfo = 1;
|
||||
}
|
||||
|
||||
message GetECSchemasRequestProto { // void request
|
||||
}
|
||||
|
||||
message GetECSchemasResponseProto {
|
||||
repeated ECSchemaProto schemas = 1;
|
||||
}
|
||||
|
||||
service ClientNamenodeProtocol {
|
||||
rpc getBlockLocations(GetBlockLocationsRequestProto)
|
||||
returns(GetBlockLocationsResponseProto);
|
||||
@ -879,4 +886,6 @@ service ClientNamenodeProtocol {
|
||||
returns(GetEditsFromTxidResponseProto);
|
||||
rpc getErasureCodingInfo(GetErasureCodingInfoRequestProto)
|
||||
returns(GetErasureCodingInfoResponseProto);
|
||||
rpc getECSchemas(GetECSchemasRequestProto)
|
||||
returns(GetECSchemasResponseProto);
|
||||
}
|
||||
|
@ -637,8 +637,7 @@ message ECSchemaProto {
|
||||
required string codecName = 2;
|
||||
required uint32 dataUnits = 3;
|
||||
required uint32 parityUnits = 4;
|
||||
required uint32 chunkSize = 5;
|
||||
repeated ECSchemaOptionEntryProto options = 6;
|
||||
repeated ECSchemaOptionEntryProto options = 5;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,57 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestECSchemas {
|
||||
private MiniDFSCluster cluster;
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(0)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetECSchemas() throws Exception {
|
||||
ECSchema[] ecSchemas = cluster.getFileSystem().getClient().getECSchemas();
|
||||
// TODO update assertion after HDFS-7866
|
||||
assertNotNull(ecSchemas);
|
||||
assertEquals("Should have only one ecSchema", 1, ecSchemas.length);
|
||||
assertEquals("Returned schemas should have only default schema",
|
||||
ECSchemaManager.getSystemDefaultSchema(), ecSchemas[0]);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user