From 3e1317de0213a96de9a771bada820e7a57825c97 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Tue, 7 Mar 2017 10:28:21 -0800 Subject: [PATCH] HDFS-11451. Ozone: Add protobuf definitions for container reports. Contributed by Anu Engineer. --- .../statemachine/SCMConnectionManager.java | 4 +- .../common/statemachine/StateContext.java | 40 ++++++- .../endpoint/HeartbeatEndpointTask.java | 41 +++++-- .../StorageContainerDatanodeProtocol.java | 4 +- .../commands/SendContainerCommand.java | 80 ++++++++++++++ ...atanodeProtocolClientSideTranslatorPB.java | 5 +- ...atanodeProtocolServerSideTranslatorPB.java | 3 +- .../ozone/scm/StorageContainerManager.java | 6 +- .../ozone/scm/container/ContainerMapping.java | 23 ++-- .../hadoop/ozone/scm/node/CommandQueue.java | 8 +- .../StorageContainerDatanodeProtocol.proto | 102 +++++++++++++++--- .../ozone/container/common/ScmTestMock.java | 17 +-- .../common/TestDatanodeStateMachine.java | 14 ++- .../ozone/container/common/TestEndPoint.java | 52 +++++---- 14 files changed, 331 insertions(+), 68 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java index 33d361b8d7..a2384e835e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java @@ -73,7 +73,7 @@ public Configuration getConf() { * * @return - Return RPC timeout. */ - public long getRpcTimeout() { + public int getRpcTimeout() { return rpcTimeout; } @@ -128,7 +128,7 @@ public void addSCMServer(InetSocketAddress address) throws IOException { StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy( StorageContainerDatanodeProtocolPB.class, version, address, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), rpcTimeout); + NetUtils.getDefaultSocketFactory(conf), getRpcTimeout()); StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 4fa8729257..15a241edf3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -19,10 +19,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode - .RunningDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import java.util.LinkedList; import java.util.Queue; @@ -34,6 +34,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState.states + .noContainerReports; + /** * Current Context of State Machine. */ @@ -45,6 +49,9 @@ public class StateContext { private final Configuration conf; private DatanodeStateMachine.DatanodeStates state; private SCMNodeReport nrState; + private ReportState reportState; + private static final ReportState DEFAULT_REPORT_STATE = + ReportState.newBuilder().setState(noContainerReports).setCount(0).build(); /** * Constructs a StateContext. @@ -174,6 +181,7 @@ public void execute(ExecutorService service, long time, TimeUnit unit) if (isExiting(newState)) { task.onExit(); } + this.clearReportState(); this.setState(newState); } } @@ -215,4 +223,32 @@ public long getExecutionCount() { } + /** + * Gets the ReportState. + * @return ReportState. + */ + public synchronized ReportState getContainerReportState() { + if (reportState == null) { + return DEFAULT_REPORT_STATE; + } + return reportState; + } + + /** + * Sets the ReportState. + * @param rState - ReportState. + */ + public synchronized void setContainerReportState(ReportState rState) { + this.reportState = rState; + } + + /** + * Clears report state after it has been communicated. + */ + public synchronized void clearReportState() { + if(reportState != null) { + setContainerReportState(null); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index c0226bbf87..5aba6876c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -26,6 +26,13 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; +import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,12 +96,13 @@ public EndpointStateMachine.EndPointStates call() throws Exception { DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this .containerNodeIDProto.getDatanodeID()); - rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID, - this.context.getNodeReport()); + SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() + .sendHeartbeat(datanodeID, this.context.getNodeReport(), + this.context.getContainerReportState()); + processResponse(reponse); rpcEndpoint.zeroMissedCount(); } catch (IOException ex) { - rpcEndpoint.logIfNeeded(ex - ); + rpcEndpoint.logIfNeeded(ex); } finally { rpcEndpoint.unlock(); } @@ -109,6 +117,27 @@ public static Builder newBuilder() { return new Builder(); } + /** + * Add this command to command processing Queue. + * + * @param response - SCMHeartbeat response. + */ + private void processResponse(SCMHeartbeatResponseProto response) { + for (SCMCommandResponseProto commandResponseProto : response + .getCommandsList()) { + if (commandResponseProto.getCmdType() == + StorageContainerDatanodeProtocolProtos.Type.nullCmd) { + //this.context.addCommand(NullCommand.newBuilder().build()); + LOG.debug("Discarding a null command from SCM."); + } + if (commandResponseProto.getCmdType() == + StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) { + this.context.addCommand(SendContainerCommand.getFromProtobuf( + commandResponseProto.getSendReport())); + } + } + } + /** * Builder class for HeartbeatEndpointTask. */ @@ -138,8 +167,8 @@ public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { /** * Sets the Config. * - * @param config - config - * @return Builder + * @param config - config + * @return Builder */ public Builder setConfig(Configuration config) { this.conf = config; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index e93baeffc5..76f359ca75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -18,6 +18,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; @@ -43,11 +44,12 @@ SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest) * Used by data node to send a Heartbeat. * @param datanodeID - Datanode ID. * @param nodeReport - node report state + * @param reportState - container report state. * @return - SCMHeartbeatResponseProto * @throws IOException */ SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport) throws IOException; + SCMNodeReport nodeReport, ReportState reportState) throws IOException; /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java new file mode 100644 index 0000000000..6a9fc44246 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SendContainerReportProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; + +/** + * Allows a Datanode to send in the container report. + */ +public class SendContainerCommand extends SCMCommand { + /** + * Returns a NullCommand class from NullCommandResponse Proto. + * @param unused - unused + * @return NullCommand + */ + public static SendContainerCommand getFromProtobuf( + final SendContainerReportProto unused) { + return new SendContainerCommand(); + } + + /** + * returns a new builder. + * @return Builder + */ + public static SendContainerCommand.Builder newBuilder() { + return new SendContainerCommand.Builder(); + } + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public Type getType() { + return Type.sendContainerReport; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public byte[] getProtoBufMessage() { + return SendContainerReportProto.newBuilder().build().toByteArray(); + } + + /** + * A Builder class this is the standard pattern we are using for all commands. + */ + public static class Builder { + /** + * Return a null command. + * @return - NullCommand. + */ + public SendContainerCommand build() { + return new SendContainerCommand(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index 477fac5ade..78b46cfc63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -23,6 +23,8 @@ import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.ozone.protocol.proto @@ -121,11 +123,12 @@ public SCMVersionResponseProto getVersion(SCMVersionRequestProto @Override public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport) throws IOException { + SCMNodeReport nodeReport, ReportState reportState) throws IOException { SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto .newBuilder(); req.setDatanodeID(datanodeID.getProtoBufMessage()); req.setNodeReport(nodeReport); + req.setContainerReportState(reportState); final SCMHeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 995290f788..ca13305af5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -78,7 +78,8 @@ public StorageContainerDatanodeProtocolServerSideTranslatorPB( SCMHeartbeatRequestProto request) throws ServiceException { try { return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request - .getDatanodeID()), request.getNodeReport()); + .getDatanodeID()), request.getNodeReport(), + request.getContainerReportState()); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 3f31e98c36..5820e6decb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -36,6 +36,8 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.NullCmdResponseProto; import org.apache.hadoop.ozone.protocol.proto @@ -391,12 +393,14 @@ public SCMVersionResponseProto getVersion( * Used by data node to send a Heartbeat. * * @param datanodeID - Datanode ID. + * @param nodeReport - Node Report + * @param reportState - Container report ready info. * @return - SCMHeartbeatResponseProto * @throws IOException */ @Override public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport) throws IOException { + SCMNodeReport nodeReport, ReportState reportState) throws IOException { List commands = getScmNodeManager().sendHeartbeat(datanodeID, nodeReport); List cmdReponses = new LinkedList<>(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index 346cb88ee8..6bcdb4eb05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -49,7 +49,7 @@ public class ContainerMapping implements Mapping { private final long cacheSize; private final Lock lock; private final Charset encoding = Charset.forName("UTF-8"); - private final LevelDBStore store; + private final LevelDBStore containerStore; private final Random rand; /** @@ -75,11 +75,14 @@ public ContainerMapping(Configuration conf, NodeManager nodeManager, throw new IllegalArgumentException("SCM metadata directory is not valid."); } - File dbPath = new File(scmMetaDataDir, "SCM.db"); Options options = new Options(); options.cacheSize(this.cacheSize * (1024L * 1024L)); options.createIfMissing(); - store = new LevelDBStore(dbPath, options); + + // Write the container name to pipeline mapping. + File containerDBPath = new File(scmMetaDataDir, "container.db"); + containerStore = new LevelDBStore(containerDBPath, options); + this.lock = new ReentrantLock(); rand = new Random(); } @@ -103,6 +106,8 @@ private static Pipeline newPipelineFromNodes(DatanodeID node, String return pipeline; } + + /** * Returns the Pipeline from the container name. * @@ -114,7 +119,8 @@ public Pipeline getContainer(String containerName) throws IOException { Pipeline pipeline = null; lock.lock(); try { - byte[] pipelineBytes = store.get(containerName.getBytes(encoding)); + byte[] pipelineBytes = + containerStore.get(containerName.getBytes(encoding)); if (pipelineBytes == null) { throw new IOException("Specified key does not exist. key : " + containerName); @@ -145,7 +151,8 @@ public Pipeline allocateContainer(String containerName) throws IOException { lock.lock(); try { - byte[] pipelineBytes = store.get(containerName.getBytes(encoding)); + byte[] pipelineBytes = + containerStore.get(containerName.getBytes(encoding)); if (pipelineBytes != null) { throw new IOException("Specified container already exists. key : " + containerName); @@ -153,7 +160,7 @@ public Pipeline allocateContainer(String containerName) throws IOException { DatanodeID id = getDatanodeID(); if (id != null) { pipeline = newPipelineFromNodes(id, containerName); - store.put(containerName.getBytes(encoding), + containerStore.put(containerName.getBytes(encoding), pipeline.getProtobufMessage().toByteArray()); } } finally { @@ -193,8 +200,8 @@ private DatanodeID getDatanodeID() throws IOException { */ @Override public void close() throws IOException { - if (store != null) { - store.close(); + if (containerStore != null) { + containerStore.close(); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java index fce86f7807..9a49fc5c13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java @@ -39,7 +39,8 @@ public class CommandQueue { private final Map> commandMap; private final Lock lock; - private final List nullList; + // This map is used as default return value containing one null command. + private static final List DEFAULT_LIST = new LinkedList<>(); /** * Constructs a Command Queue. @@ -47,8 +48,7 @@ public class CommandQueue { public CommandQueue() { commandMap = new HashMap<>(); lock = new ReentrantLock(); - nullList = new LinkedList<>(); - nullList.add(NullCommand.newBuilder().build()); + DEFAULT_LIST.add(NullCommand.newBuilder().build()); } /** @@ -75,7 +75,7 @@ List getCommand(final DatanodeID datanodeID) { } finally { lock.unlock(); } - return nullList; + return DEFAULT_LIST; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index ee1378f1e8..38be4bea08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -47,15 +47,80 @@ import "DatanodeContainerProtocol.proto"; */ message SCMHeartbeatRequestProto { required DatanodeIDProto datanodeID = 1; - optional SCMNodeReport nodeReport= 2; + optional SCMNodeReport nodeReport = 2; + optional ReportState containerReportState = 3; } +enum ContainerState { + closed = 0; + open = 1; +} + +/** +NodeState contains messages from datanode to SCM saying that it has +some information that SCM might be interested in.*/ +message ReportState { + enum states { + noContainerReports = 0; + completeContinerReport = 1; + deltaContainerReport = 2; + } + required states state = 1; + required int64 count = 2 [default = 0]; +} + + +/** +This message is used to persist the information about a container in the +SCM database, This information allows SCM to startup faster and avoid having +all container info in memory all the time. + */ +message ContainerPersistanceProto { + required ContainerState state = 1; + required hadoop.hdfs.ozone.Pipeline pipeline = 2; + required ContainerInfo info = 3; +} + +/** +This message is used to do a quick look up of which containers are effected +if a node goes down +*/ +message NodeContianerMapping { + repeated string contianerName = 1; +} + + + +/** +A container report contains the following information. +*/ +message ContainerInfo { + required string containerName = 1; + repeated bytes finalhash = 2; + optional int64 size = 3; + optional int64 keycount = 4; +} + +/** +A set of container reports, max count is generally set to +8192 since that keeps the size of the reports under 1 MB. +*/ +message ContainerReports { + enum reportType { + fullReport = 0; + deltaReport = 1; + } + repeated ContainerInfo reports = 1; + required reportType type = 2; +} + + /** * This message is send along with the heart beat to report datanode * storage utilization by SCM. */ message SCMNodeReport { - repeated SCMStorageReport storageReport= 1; + repeated SCMStorageReport storageReport = 1; } message SCMStorageReport { @@ -123,6 +188,12 @@ message NullCmdResponseProto { } +/** +This command tells the data node to send in the container report when possible +*/ +message SendContainerReportProto { +} + /** Type of commands supported by SCM to datanode protocol. */ @@ -130,6 +201,7 @@ enum Type { nullCmd = 1; versionCommand = 2; registeredCommand = 3; + sendContainerReport = 4; } /* @@ -138,6 +210,10 @@ enum Type { message SCMCommandResponseProto { required Type cmdType = 2; // Type of the command optional NullCmdResponseProto nullCommand = 3; + optional SCMRegisteredCmdResponseProto registeredProto = 4; + optional SCMVersionResponseProto versionProto = 5; + optional SendContainerReportProto sendReport = 6; + } @@ -157,9 +233,9 @@ message SCMHeartbeatResponseProto { * Here is a simple state diagram that shows how a datanode would boot up and * communicate with SCM. * - * +-----------------------+ + * ----------------------- * | Start | - * +----------+------------+ + * ---------- ------------ * | * | * | @@ -167,19 +243,19 @@ message SCMHeartbeatResponseProto { * | * | * | - * +----------v-------------+ - * | Searching for SCM +------------+ - * +----------+-------------+ | + * ----------v------------- + * | Searching for SCM ------------ + * ---------- ------------- | * | | * | | - * | +----------v-------------+ + * | ----------v------------- * | | Register if needed | - * | +-----------+------------+ + * | ----------- ------------ * | | * v | - * +-----------+----------------+ | - * +---------+ Heartbeat state <--------+ - * | +--------^-------------------+ + * ----------- ---------------- | + * --------- Heartbeat state <-------- + * | --------^------------------- * | | * | | * | | @@ -187,7 +263,7 @@ message SCMHeartbeatResponseProto { * | | * | | * | | - * +------------------+ + * ------------------ * * * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index b20202e9d2..a9787cefa4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -20,10 +20,9 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.NullCommand; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.scm.VersionInfo; import java.io.IOException; @@ -37,6 +36,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { private int rpcResponseDelay; private AtomicInteger heartbeatCount = new AtomicInteger(0); private AtomicInteger rpcCount = new AtomicInteger(0); + private ReportState reportState; /** * Returns the number of heartbeats made to this class. @@ -112,10 +112,11 @@ private void sleepIfNeeded() { */ @Override public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto - sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport) - throws IOException { + sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport, + ReportState reportState) throws IOException { rpcCount.incrementAndGet(); heartbeatCount.incrementAndGet(); + this.reportState = reportState; sleepIfNeeded(); StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto cmdResponse = StorageContainerDatanodeProtocolProtos @@ -153,4 +154,8 @@ private void sleepIfNeeded() { StorageContainerDatanodeProtocolProtos .SCMRegisteredCmdResponseProto.ErrorCode.success).build(); } + + public ReportState getReportState() { + return this.reportState; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java index d9e4cd78a8..b0e361604a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.After; @@ -48,6 +49,8 @@ import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT; /** * Tests the datanode state machine class and its states. @@ -65,6 +68,7 @@ public class TestDatanodeStateMachine { @Before public void setUp() throws Exception { conf = SCMTestUtils.getConf(); + conf.setInt(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, 500); serverAddresses = new LinkedList<>(); scmServers = new LinkedList<>(); mockServers = new LinkedList<>(); @@ -194,9 +198,9 @@ public void testDatanodeStateContext() throws IOException, // This execute will invoke getVersion calls against all SCM endpoints // that we know of. - task.execute(executorService); - newState = task.await(2, TimeUnit.SECONDS); + task.execute(executorService); + newState = task.await(10, TimeUnit.SECONDS); // If we are in running state, we should be in running. Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, newState); @@ -246,8 +250,14 @@ public void testDatanodeStateContext() throws IOException, Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, newState); + for (ScmTestMock mock : mockServers) { Assert.assertEquals(1, mock.getHeartbeatCount()); + // Assert that heartbeat did indeed carry that State that we said + // have in the datanode. + Assert.assertEquals(mock.getReportState().getState().getNumber(), + StorageContainerDatanodeProtocolProtos.ReportState.states + .noContainerReports.getNumber()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 9404393faf..58f74f42cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -20,7 +20,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -30,16 +31,18 @@ .RegisterEndpointTask; import org.apache.hadoop.ozone.container.common.states.endpoint .VersionEndpointTask; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ozone.protocol.proto @@ -58,6 +61,9 @@ import java.util.UUID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState.states + .noContainerReports; /** * Tests the endpoints. @@ -67,6 +73,27 @@ public class TestEndPoint { private static RPC.Server scmServer; private static ScmTestMock scmServerImpl; private static File testDir; + private static StorageContainerDatanodeProtocolProtos.ReportState + defaultReportState; + + @AfterClass + public static void tearDown() throws Exception { + if (scmServer != null) { + scmServer.stop(); + } + FileUtil.fullyDelete(testDir); + } + + @BeforeClass + public static void setUp() throws Exception { + serverAddress = SCMTestUtils.getReuseableAddress(); + scmServerImpl = new ScmTestMock(); + scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(), + scmServerImpl, serverAddress, 10); + testDir = PathUtils.getTestDir(TestEndPoint.class); + defaultReportState = StorageContainerDatanodeProtocolProtos.ReportState. + newBuilder().setState(noContainerReports).build(); + } @Test /** @@ -255,7 +282,7 @@ public void testHeartbeat() throws Exception { srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build(); nrb.addStorageReport(srb); SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() - .sendHeartbeat(dataNode, nrb.build()); + .sendHeartbeat(dataNode, nrb.build(), defaultReportState); Assert.assertNotNull(responseProto); Assert.assertEquals(1, responseProto.getCommandsCount()); Assert.assertNotNull(responseProto.getCommandsList().get(0)); @@ -322,21 +349,4 @@ public void testHeartbeatTaskRpcTimeOut() throws Exception { scmServerImpl.setRpcResponseDelay(0); Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); } - - @AfterClass - public static void tearDown() throws Exception { - if (scmServer != null) { - scmServer.stop(); - } - FileUtil.fullyDelete(testDir); - } - - @BeforeClass - public static void setUp() throws Exception { - serverAddress = SCMTestUtils.getReuseableAddress(); - scmServerImpl = new ScmTestMock(); - scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(), - scmServerImpl, serverAddress, 10); - testDir = PathUtils.getTestDir(TestEndPoint.class); - } }