HDFS-11062. Ozone:SCM: Remove null command. Contributed by Yuanbo Liu.
This commit is contained in:
parent
b7463924af
commit
2129c6b276
@ -125,11 +125,6 @@ public static Builder newBuilder() {
|
||||
private void processResponse(SCMHeartbeatResponseProto response) {
|
||||
for (SCMCommandResponseProto commandResponseProto : response
|
||||
.getCommandsList()) {
|
||||
if (commandResponseProto.getCmdType() ==
|
||||
StorageContainerDatanodeProtocolProtos.Type.nullCmd) {
|
||||
//this.context.addCommand(NullCommand.newBuilder().build());
|
||||
LOG.debug("Discarding a null command from SCM.");
|
||||
}
|
||||
if (commandResponseProto.getCmdType() ==
|
||||
StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) {
|
||||
this.context.addCommand(SendContainerCommand.getFromProtobuf(
|
||||
|
@ -1,81 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.protocol.commands;
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* For each command that SCM can return we have a class in this commands
|
||||
* directory. This is the Null command, that tells datanode that no action is
|
||||
* needed from it.
|
||||
*/
|
||||
public class NullCommand extends SCMCommand<NullCmdResponseProto> {
|
||||
/**
|
||||
* Returns the type of this command.
|
||||
*
|
||||
* @return Type
|
||||
*/
|
||||
@Override
|
||||
public Type getType() {
|
||||
return Type.nullCmd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the protobuf message of this object.
|
||||
*
|
||||
* @return A protobuf message.
|
||||
*/
|
||||
@Override
|
||||
public byte[] getProtoBufMessage() {
|
||||
return NullCmdResponseProto.newBuilder().build().toByteArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a NullCommand class from NullCommandResponse Proto.
|
||||
* @param unused - unused
|
||||
* @return NullCommand
|
||||
*/
|
||||
public static NullCommand getFromProtobuf(final NullCmdResponseProto
|
||||
unused) {
|
||||
return new NullCommand();
|
||||
}
|
||||
|
||||
/**
|
||||
* returns a new builder.
|
||||
* @return Builder
|
||||
*/
|
||||
public static Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
/**
|
||||
* A Builder class this is the standard pattern we are using for all commands.
|
||||
*/
|
||||
public static class Builder {
|
||||
/**
|
||||
* Return a null command.
|
||||
* @return - NullCommand.
|
||||
*/
|
||||
public NullCommand build() {
|
||||
return new NullCommand();
|
||||
}
|
||||
}
|
||||
}
|
@ -42,8 +42,6 @@
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ReportState;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
@ -58,6 +56,8 @@
|
||||
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.Type;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
@ -296,26 +296,27 @@ public static void main(String[] argv) throws IOException {
|
||||
public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
|
||||
throws InvalidProtocolBufferException {
|
||||
Type type = cmd.getType();
|
||||
SCMCommandResponseProto.Builder builder =
|
||||
SCMCommandResponseProto.newBuilder();
|
||||
switch (type) {
|
||||
case nullCmd:
|
||||
return getNullCmdResponse();
|
||||
case registeredCommand:
|
||||
return builder.setCmdType(Type.registeredCommand)
|
||||
.setRegisteredProto(
|
||||
SCMRegisteredCmdResponseProto.getDefaultInstance())
|
||||
.build();
|
||||
case versionCommand:
|
||||
return builder.setCmdType(Type.versionCommand)
|
||||
.setVersionProto(SCMVersionResponseProto.getDefaultInstance())
|
||||
.build();
|
||||
case sendContainerReport:
|
||||
return builder.setCmdType(Type.sendContainerReport)
|
||||
.setSendReport(SendContainerReportProto.getDefaultInstance())
|
||||
.build();
|
||||
default:
|
||||
throw new IllegalArgumentException("Not implemented");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a null command response.
|
||||
* @return
|
||||
* @throws InvalidProtocolBufferException
|
||||
*/
|
||||
private static SCMCommandResponseProto getNullCmdResponse() {
|
||||
return SCMCommandResponseProto.newBuilder()
|
||||
.setCmdType(Type.nullCmd)
|
||||
.setNullCommand(NullCmdResponseProto.getDefaultInstance())
|
||||
.build();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static SCMRegisteredCmdResponseProto getRegisteredResponse(
|
||||
SCMCommand cmd, SCMNodeAddressList addressList) {
|
||||
@ -465,11 +466,11 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
|
||||
SCMNodeReport nodeReport, ReportState reportState) throws IOException {
|
||||
List<SCMCommand> commands =
|
||||
getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
|
||||
List<SCMCommandResponseProto> cmdReponses = new LinkedList<>();
|
||||
List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
|
||||
for (SCMCommand cmd : commands) {
|
||||
cmdReponses.add(getCommandResponse(cmd));
|
||||
cmdResponses.add(getCommandResponse(cmd));
|
||||
}
|
||||
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdReponses)
|
||||
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -501,8 +502,9 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
|
||||
sendContainerReport(ContainerReportsProto reports) throws IOException {
|
||||
// TODO : fix this in the server side code changes for handling this request
|
||||
// correctly.
|
||||
return SCMHeartbeatResponseProto.newBuilder()
|
||||
.addCommands(getNullCmdResponse()).build();
|
||||
List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
|
||||
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -18,7 +18,6 @@
|
||||
package org.apache.hadoop.ozone.scm.node;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
|
||||
import java.util.HashMap;
|
||||
@ -39,7 +38,7 @@ public class CommandQueue {
|
||||
|
||||
private final Map<DatanodeID, List<SCMCommand>> commandMap;
|
||||
private final Lock lock;
|
||||
// This map is used as default return value containing one null command.
|
||||
// This map is used as default return value.
|
||||
private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>();
|
||||
|
||||
/**
|
||||
@ -48,12 +47,11 @@ public class CommandQueue {
|
||||
public CommandQueue() {
|
||||
commandMap = new HashMap<>();
|
||||
lock = new ReentrantLock();
|
||||
DEFAULT_LIST.add(NullCommand.newBuilder().build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of Commands for the datanode to execute, if we have no
|
||||
* commands returns a list with Null Command otherwise the current set of
|
||||
* commands returns a empty list otherwise the current set of
|
||||
* commands are returned and command map set to empty list again.
|
||||
*
|
||||
* @param datanodeID DatanodeID
|
||||
@ -67,8 +65,7 @@ List<SCMCommand> getCommand(final DatanodeID datanodeID) {
|
||||
if (commandMap.containsKey(datanodeID)) {
|
||||
List temp = commandMap.get(datanodeID);
|
||||
if (temp.size() > 0) {
|
||||
LinkedList<SCMCommand> emptyList = new LinkedList<>();
|
||||
commandMap.put(datanodeID, emptyList);
|
||||
commandMap.put(datanodeID, DEFAULT_LIST);
|
||||
return temp;
|
||||
}
|
||||
}
|
||||
|
@ -181,12 +181,6 @@ message ContainerNodeIDProto {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Empty Command Response
|
||||
*/
|
||||
message NullCmdResponseProto {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
This command tells the data node to send in the container report when possible
|
||||
@ -198,7 +192,6 @@ message SendContainerReportProto {
|
||||
Type of commands supported by SCM to datanode protocol.
|
||||
*/
|
||||
enum Type {
|
||||
nullCmd = 1;
|
||||
versionCommand = 2;
|
||||
registeredCommand = 3;
|
||||
sendContainerReport = 4;
|
||||
@ -209,10 +202,9 @@ enum Type {
|
||||
*/
|
||||
message SCMCommandResponseProto {
|
||||
required Type cmdType = 2; // Type of the command
|
||||
optional NullCmdResponseProto nullCommand = 3;
|
||||
optional SCMRegisteredCmdResponseProto registeredProto = 4;
|
||||
optional SCMVersionResponseProto versionProto = 5;
|
||||
optional SendContainerReportProto sendReport = 6;
|
||||
optional SCMRegisteredCmdResponseProto registeredProto = 3;
|
||||
optional SCMVersionResponseProto versionProto = 4;
|
||||
optional SendContainerReportProto sendReport = 5;
|
||||
|
||||
}
|
||||
|
||||
|
@ -20,13 +20,16 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
||||
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||
import org.apache.hadoop.ozone.scm.VersionInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@ -137,25 +140,10 @@ private void sleepIfNeeded() {
|
||||
heartbeatCount.incrementAndGet();
|
||||
this.reportState = reportState;
|
||||
sleepIfNeeded();
|
||||
return getNullRespose();
|
||||
}
|
||||
|
||||
private StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
|
||||
getNullRespose() throws
|
||||
com.google.protobuf.InvalidProtocolBufferException {
|
||||
StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
|
||||
cmdResponse = StorageContainerDatanodeProtocolProtos
|
||||
.SCMCommandResponseProto
|
||||
.newBuilder().setCmdType(StorageContainerDatanodeProtocolProtos
|
||||
.Type.nullCmd)
|
||||
.setNullCommand(
|
||||
StorageContainerDatanodeProtocolProtos.NullCmdResponseProto
|
||||
.parseFrom(
|
||||
NullCommand.newBuilder().build().getProtoBufMessage()))
|
||||
List<SCMCommandResponseProto>
|
||||
cmdResponses = new LinkedList<>();
|
||||
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
|
||||
.build();
|
||||
return StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
|
||||
.newBuilder()
|
||||
.addCommands(cmdResponse).build();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -188,13 +176,16 @@ private void sleepIfNeeded() {
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
|
||||
public SCMHeartbeatResponseProto
|
||||
sendContainerReport(StorageContainerDatanodeProtocolProtos
|
||||
.ContainerReportsProto reports) throws IOException {
|
||||
Preconditions.checkNotNull(reports);
|
||||
containerReportsCount.incrementAndGet();
|
||||
closedContainerCount.addAndGet(reports.getReportsCount());
|
||||
return getNullRespose();
|
||||
List<SCMCommandResponseProto>
|
||||
cmdResponses = new LinkedList<>();
|
||||
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
|
||||
.build();
|
||||
}
|
||||
|
||||
public ReportState getReportState() {
|
||||
|
@ -47,8 +47,6 @@
|
||||
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.Type;
|
||||
import org.apache.hadoop.ozone.scm.VersionInfo;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
@ -286,10 +284,7 @@ public void testHeartbeat() throws Exception {
|
||||
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
|
||||
.sendHeartbeat(dataNode, nrb.build(), defaultReportState);
|
||||
Assert.assertNotNull(responseProto);
|
||||
Assert.assertEquals(1, responseProto.getCommandsCount());
|
||||
Assert.assertNotNull(responseProto.getCommandsList().get(0));
|
||||
Assert.assertEquals(responseProto.getCommandsList().get(0).getCmdType(),
|
||||
Type.nullCmd);
|
||||
Assert.assertEquals(0, responseProto.getCommandsCount());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,8 +176,8 @@ public void testContainerPlacementCapacity() throws IOException,
|
||||
|
||||
thrown.expect(IOException.class);
|
||||
thrown.expectMessage(
|
||||
startsWith("Unable to find enough nodes that meet the space " +
|
||||
"requirement in healthy node set."));
|
||||
startsWith("Unable to find enough nodes that meet "
|
||||
+ "the space requirement"));
|
||||
String container2 = UUID.randomUUID().toString();
|
||||
containerManager.allocateContainer(container2,
|
||||
ScmClient.ReplicationFactor.THREE);
|
||||
|
Loading…
Reference in New Issue
Block a user