HDFS-12740. SCM should support a RPC to share the cluster Id with KSM and DataNodes. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
c8d8270f72
commit
f9c11d952c
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.scm;
|
||||
|
||||
/**
|
||||
* ScmInfo wraps the result returned from SCM#getScmInfo which
|
||||
* contains clusterId and the SCM Id.
|
||||
*/
|
||||
public final class ScmInfo {
|
||||
private String clusterId;
|
||||
private String scmId;
|
||||
|
||||
/**
|
||||
* Builder for ScmInfo.
|
||||
*/
|
||||
public static class Builder {
|
||||
private String clusterId;
|
||||
private String scmId;
|
||||
|
||||
/**
|
||||
* sets the cluster id.
|
||||
* @param cid clusterId to be set
|
||||
* @return Builder for ScmInfo
|
||||
*/
|
||||
public Builder setClusterId(String cid) {
|
||||
this.clusterId = cid;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* sets the scmId.
|
||||
* @param id scmId
|
||||
* @return Builder for scmInfo
|
||||
*/
|
||||
public Builder setScmId(String id) {
|
||||
this.scmId = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ScmInfo build() {
|
||||
return new ScmInfo(clusterId, scmId);
|
||||
}
|
||||
}
|
||||
|
||||
private ScmInfo(String clusterId, String scmId) {
|
||||
this.clusterId = clusterId;
|
||||
this.scmId = scmId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the clusterId from the Version file.
|
||||
* @return ClusterId
|
||||
*/
|
||||
public String getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the SCM Id from the Version file.
|
||||
* @return SCM Id
|
||||
*/
|
||||
public String getScmId() {
|
||||
return scmId;
|
||||
}
|
||||
}
|
@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.scm.ScmInfo;
|
||||
|
||||
/**
|
||||
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
|
||||
@ -63,4 +64,9 @@ public interface ScmBlockLocationProtocol {
|
||||
*/
|
||||
List<DeleteBlockGroupResult>
|
||||
deleteKeyBlocks(List<BlockGroup> keyBlocksInfoList) throws IOException;
|
||||
|
||||
/**
|
||||
* Gets the Clusterid and SCM Id from SCM.
|
||||
*/
|
||||
ScmInfo getScmInfo() throws IOException;
|
||||
}
|
||||
|
@ -33,8 +33,11 @@ import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.Del
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.GetScmBlockLocationsResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.ScmLocatedBlockProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.GetScmInfoRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.GetScmInfoRespsonseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.scm.ScmInfo;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
@ -175,6 +178,27 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the cluster Id and Scm Id from SCM.
|
||||
* @return ScmInfo
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public ScmInfo getScmInfo() throws IOException {
|
||||
GetScmInfoRequestProto request =
|
||||
GetScmInfoRequestProto.getDefaultInstance();
|
||||
GetScmInfoRespsonseProto resp;
|
||||
try {
|
||||
resp = rpcProxy.getScmInfo(NULL_RPC_CONTROLLER, request);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
ScmInfo.Builder builder = new ScmInfo.Builder()
|
||||
.setClusterId(resp.getClusterId())
|
||||
.setScmId(resp.getScmId());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getUnderlyingProxyObject() {
|
||||
return rpcProxy;
|
||||
|
@ -131,6 +131,20 @@ message AllocateScmBlockResponseProto {
|
||||
optional string errorMessage = 5;
|
||||
}
|
||||
|
||||
/**
|
||||
* Request for cluster Id and SCM Id from SCM.
|
||||
*/
|
||||
message GetScmInfoRequestProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* Response from SCM for cluster Id and SCM ID.
|
||||
*/
|
||||
message GetScmInfoRespsonseProto {
|
||||
required string clusterId = 1;
|
||||
required string scmId = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol used from KeySpaceManager to StorageContainerManager.
|
||||
* See request and response messages for details of the RPC calls.
|
||||
@ -156,4 +170,10 @@ service ScmBlockLocationProtocolService {
|
||||
*/
|
||||
rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto)
|
||||
returns (DeleteScmKeyBlocksResponseProto);
|
||||
|
||||
/**
|
||||
* Gets the scmInfo from SCM.
|
||||
*/
|
||||
rpc getScmInfo(GetScmInfoRequestProto)
|
||||
returns (GetScmInfoRespsonseProto);
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.hadoop.ozone.ksm;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
@ -42,6 +43,7 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.scm.ScmInfo;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
@ -137,6 +139,10 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
||||
return scmBlockLocationClient;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ScmInfo getScmInfo(OzoneConfiguration conf) throws IOException {
|
||||
return getScmBlockClient(conf).getScmInfo();
|
||||
}
|
||||
/**
|
||||
* Starts an RPC server, if configured.
|
||||
*
|
||||
|
@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.scm.ScmInfo;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
@ -43,6 +44,10 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.GetScmBlockLocationsResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.ScmLocatedBlockProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.GetScmInfoRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.GetScmInfoRespsonseProto;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@ -148,4 +153,19 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
||||
}
|
||||
return resp.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetScmInfoRespsonseProto getScmInfo(RpcController controller,
|
||||
GetScmInfoRequestProto req) throws ServiceException {
|
||||
ScmInfo scmInfo;
|
||||
try {
|
||||
scmInfo = impl.getScmInfo();
|
||||
} catch (IOException ex) {
|
||||
throw new ServiceException(ex);
|
||||
}
|
||||
return GetScmInfoRespsonseProto.newBuilder()
|
||||
.setClusterId(scmInfo.getClusterId())
|
||||
.setScmId(scmInfo.getScmId())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
@ -44,11 +44,11 @@ public class SCMStorage extends Storage {
|
||||
super(NodeType.SCM, OzoneUtils.getScmMetadirPath(conf), STORAGE_DIR);
|
||||
}
|
||||
|
||||
public void setScmUuid(String scmUuid) throws IOException {
|
||||
public void setScmId(String scmId) throws IOException {
|
||||
if (getState() == StorageState.INITIALIZED) {
|
||||
throw new IOException("SCM is already initialized.");
|
||||
} else {
|
||||
getStorageInfo().setProperty(SCM_ID, scmUuid);
|
||||
getStorageInfo().setProperty(SCM_ID, scmId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,18 +56,18 @@ public class SCMStorage extends Storage {
|
||||
* Retrieves the SCM ID from the version file.
|
||||
* @return SCM_ID
|
||||
*/
|
||||
public String getscmUuid() {
|
||||
public String getScmId() {
|
||||
return getStorageInfo().getProperty(SCM_ID);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Properties getNodeProperties() {
|
||||
String scmUuid = getscmUuid();
|
||||
if (scmUuid == null) {
|
||||
scmUuid = UUID.randomUUID().toString();
|
||||
String scmId = getScmId();
|
||||
if (scmId == null) {
|
||||
scmId = UUID.randomUUID().toString();
|
||||
}
|
||||
Properties scmProperties = new Properties();
|
||||
scmProperties.setProperty(SCM_ID, scmUuid);
|
||||
scmProperties.setProperty(SCM_ID, scmId);
|
||||
return scmProperties;
|
||||
}
|
||||
|
||||
|
@ -77,6 +77,7 @@ import org.apache.hadoop.ozone.scm.container.Mapping;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMMetrics;
|
||||
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.scm.ScmInfo;
|
||||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
@ -1102,7 +1103,16 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
||||
return scmBlockManager.allocateBlock(size, type, factor);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the clusterId and SCM Id from the version file in SCM.
|
||||
*/
|
||||
@Override
|
||||
public ScmInfo getScmInfo() throws IOException {
|
||||
ScmInfo.Builder builder = new ScmInfo.Builder()
|
||||
.setClusterId(scmStorage.getClusterID())
|
||||
.setScmId(scmStorage.getScmId());
|
||||
return builder.build();
|
||||
}
|
||||
/**
|
||||
* Delete blocks for a set of object keys.
|
||||
*
|
||||
|
@ -335,6 +335,8 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
|
||||
private Optional<Integer> hbSeconds = Optional.empty();
|
||||
private Optional<Integer> hbProcessorInterval = Optional.empty();
|
||||
private Optional<String> scmMetadataDir = Optional.empty();
|
||||
private Optional<String> clusterId = Optional.empty();
|
||||
private Optional<String> scmId = Optional.empty();
|
||||
private Boolean ozoneEnabled = true;
|
||||
private Boolean waitForChillModeFinish = true;
|
||||
private Boolean randomContainerPort = true;
|
||||
@ -423,6 +425,16 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setClusterId(String cId) {
|
||||
clusterId = Optional.of(cId);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setScmId(String sId) {
|
||||
scmId = Optional.of(sId);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
@ -439,6 +451,7 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
|
||||
configureTrace();
|
||||
configureSCMheartbeat();
|
||||
configScmMetadata();
|
||||
initializeScm();
|
||||
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
|
||||
@ -456,8 +469,6 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
|
||||
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
|
||||
randomContainerPort);
|
||||
|
||||
SCMStorage scmStorage = new SCMStorage(conf);
|
||||
scmStorage.initialize();
|
||||
StorageContainerManager scm = StorageContainerManager.createSCM(
|
||||
null, conf);
|
||||
scm.start();
|
||||
@ -513,6 +524,13 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
|
||||
scmPath.toString() + "/datanode.id");
|
||||
}
|
||||
|
||||
private void initializeScm() throws IOException {
|
||||
SCMStorage scmStore = new SCMStorage(conf);
|
||||
scmStore.setClusterId(clusterId.orElse(runID.toString()));
|
||||
scmStore.setScmId(scmId.orElse(UUID.randomUUID().toString()));
|
||||
scmStore.initialize();
|
||||
}
|
||||
|
||||
private void configureHandler() {
|
||||
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, this.ozoneEnabled);
|
||||
if (!ozoneHandlerType.isPresent()) {
|
||||
|
@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.scm.block.SCMBlockDeletingService;
|
||||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
import org.apache.hadoop.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.scm.ScmInfo;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -396,4 +397,26 @@ public class TestStorageContainerManager {
|
||||
Assert.assertEquals(OzoneConsts.NodeType.SCM, scmStore.getNodeType());
|
||||
Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScmInfo() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final String path =
|
||||
GenericTestUtils.getTempPath(UUID.randomUUID().toString());
|
||||
Path scmPath = Paths.get(path, "scm-meta");
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
|
||||
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
|
||||
SCMStorage scmStore = new SCMStorage(conf);
|
||||
String clusterId = UUID.randomUUID().toString();
|
||||
String scmId = UUID.randomUUID().toString();
|
||||
scmStore.setClusterId(clusterId);
|
||||
scmStore.setScmId(scmId);
|
||||
// writes the version file properties
|
||||
scmStore.initialize();
|
||||
StorageContainerManager scm = StorageContainerManager.createSCM(null, conf);
|
||||
//Reads the SCM Info from SCM instance
|
||||
ScmInfo scmInfo = scm.getScmInfo();
|
||||
Assert.assertEquals(clusterId, scmInfo.getClusterId());
|
||||
Assert.assertEquals(scmId, scmInfo.getScmId());
|
||||
}
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||
import org.apache.hadoop.ozone.web.response.KeyInfo;
|
||||
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.hadoop.scm.ScmInfo;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.Status;
|
||||
@ -66,6 +67,7 @@ import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
||||
@ -78,6 +80,8 @@ public class TestKeySpaceManager {
|
||||
private static UserArgs userArgs;
|
||||
private static KSMMetrics ksmMetrics;
|
||||
private static OzoneConfiguration conf;
|
||||
private static String clusterId;
|
||||
private static String scmId;
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
@ -93,10 +97,15 @@ public class TestKeySpaceManager {
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
clusterId = UUID.randomUUID().toString();
|
||||
scmId = UUID.randomUUID().toString();
|
||||
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
|
||||
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
|
||||
cluster = new MiniOzoneClassicCluster.Builder(conf)
|
||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
||||
.setClusterId(clusterId)
|
||||
.setScmId(scmId)
|
||||
.build();
|
||||
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
|
||||
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
|
||||
null, null, null, null);
|
||||
@ -1033,4 +1042,15 @@ public class TestKeySpaceManager {
|
||||
}
|
||||
Assert.assertEquals(dataString, DFSUtil.bytes2String(data1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the RPC call for getting scmId and clusterId from SCM.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testGetScmInfo() throws IOException {
|
||||
ScmInfo info = cluster.getKeySpaceManager().getScmInfo(conf);
|
||||
Assert.assertEquals(clusterId, info.getClusterId());
|
||||
Assert.assertEquals(scmId, info.getScmId());
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user