HDDS-103. SCM CA: Add new security protocol for SCM to expose security related functions. Contributed by Ajay Kumar.

This commit is contained in:
Ajay Kumar 2018-10-28 22:44:41 -07:00 committed by Xiaoyu Yao
parent c260c19d18
commit 33c274ea2e
13 changed files with 558 additions and 5 deletions

View File

@ -239,6 +239,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>StorageContainerLocationProtocol.proto</include> <include>StorageContainerLocationProtocol.proto</include>
<include>hdds.proto</include> <include>hdds.proto</include>
<include>ScmBlockLocationProtocol.proto</include> <include>ScmBlockLocationProtocol.proto</include>
<include>SCMSecurityProtocol.proto</include>
</includes> </includes>
</source> </source>
</configuration> </configuration>

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.security.KerberosInfo;
/**
* The protocol used to perform security related operations with SCM.
*/
@KerberosInfo(
serverPrincipal = ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
@InterfaceAudience.Private
public interface SCMSecurityProtocol {
/**
* Get SCM signed certificate for DataNode.
*
* @param dataNodeDetails - DataNode Details.
* @param certSignReq - Certificate signing request.
* @return byte[] - SCM signed certificate.
*/
String getDataNodeCertificate(
DatanodeDetailsProto dataNodeDetails,
String certSignReq) throws IOException;
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
/**
* This class is the client-side translator that forwards requests for
* {@link SCMSecurityProtocol} to the {@link SCMSecurityProtocolPB} proxy.
*/
public class SCMSecurityProtocolClientSideTranslatorPB implements
SCMSecurityProtocol, ProtocolTranslator, Closeable {
/**
* RpcController is not used and hence is set to null.
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
private final SCMSecurityProtocolPB rpcProxy;
public SCMSecurityProtocolClientSideTranslatorPB(
SCMSecurityProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
/**
* Get SCM signed certificate for DataNode.
*
* @param dataNodeDetails - DataNode Details.
* @param certSignReq - Certificate signing request.
* @return byte[] - SCM signed certificate.
*/
@Override
public String getDataNodeCertificate(DatanodeDetailsProto dataNodeDetails,
String certSignReq) throws IOException {
SCMGetDataNodeCertRequestProto.Builder builder =
SCMGetDataNodeCertRequestProto
.newBuilder()
.setCSR(certSignReq)
.setDatanodeDetails(dataNodeDetails);
try {
return rpcProxy
.getDataNodeCertificate(NULL_RPC_CONTROLLER, builder.build())
.getX509Certificate();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
/**
* Return the proxy object underlying this protocol translator.
*
* @return the proxy object underlying this protocol translator.
*/
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.protocolPB;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityProtocolService;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol for security related operations on SCM.
*/
@ProtocolInfo(protocolName =
"org.apache.hadoop.ozone.protocol.SCMSecurityProtocol",
protocolVersion = 1)
@KerberosInfo(serverPrincipal = ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
public interface SCMSecurityProtocolPB extends
SCMSecurityProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertResponseProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertResponseProto.ResponseCode;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
/**
* This class is the server-side translator that forwards requests received on
* {@link SCMSecurityProtocolPB} to the {@link
* SCMSecurityProtocol} server implementation.
*/
public class SCMSecurityProtocolServerSideTranslatorPB implements
SCMSecurityProtocolPB {
private final SCMSecurityProtocol impl;
public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl) {
this.impl = impl;
}
/**
* Get SCM signed certificate for DataNode.
*
* @param controller
* @param request
* @return SCMGetDataNodeCertResponseProto.
*/
@Override
public SCMGetDataNodeCertResponseProto getDataNodeCertificate(
RpcController controller, SCMGetDataNodeCertRequestProto request)
throws ServiceException {
try {
String certificate = impl
.getDataNodeCertificate(request.getDatanodeDetails(),
request.getCSR());
SCMGetDataNodeCertResponseProto.Builder builder =
SCMGetDataNodeCertResponseProto
.newBuilder()
.setResponseCode(ResponseCode.success)
.setX509Certificate(certificate);
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.protocolPB;
/**
* This package contains classes for wiring HDDS protobuf calls to rpc.
*/

View File

@ -172,6 +172,10 @@ public final class ScmConfigKeys {
"ozone.scm.block.client.port"; "ozone.scm.block.client.port";
public static final int OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT = 9863; public static final int OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT = 9863;
public static final String OZONE_SCM_SECURITY_SERVICE_PORT_KEY =
"ozone.scm.security.service.port";
public static final int OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT = 9961;
// Container service client // Container service client
public static final String OZONE_SCM_CLIENT_ADDRESS_KEY = public static final String OZONE_SCM_CLIENT_ADDRESS_KEY =
"ozone.scm.client.address"; "ozone.scm.client.address";
@ -188,6 +192,14 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT = public static final String OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT =
"0.0.0.0"; "0.0.0.0";
// SCM Security service address.
public static final String OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY =
"ozone.scm.security.service.address";
public static final String OZONE_SCM_SECURITY_SERVICE_BIND_HOST_KEY =
"ozone.scm.security.service.bind.host";
public static final String OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT =
"0.0.0.0";
public static final String OZONE_SCM_DATANODE_ADDRESS_KEY = public static final String OZONE_SCM_DATANODE_ADDRESS_KEY =
"ozone.scm.datanode.address"; "ozone.scm.datanode.address";
public static final String OZONE_SCM_DATANODE_BIND_HOST_KEY = public static final String OZONE_SCM_DATANODE_BIND_HOST_KEY =
@ -230,6 +242,10 @@ public final class ScmConfigKeys {
"ozone.scm.handler.count.key"; "ozone.scm.handler.count.key";
public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 10; public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 10;
public static final String OZONE_SCM_SECURITY_HANDLER_COUNT_KEY =
"ozone.scm.security.handler.count.key";
public static final int OZONE_SCM_SECURITY_HANDLER_COUNT_DEFAULT = 2;
public static final String OZONE_SCM_DEADNODE_INTERVAL = public static final String OZONE_SCM_DEADNODE_INTERVAL =
"ozone.scm.dead.node.interval"; "ozone.scm.dead.node.interval";
public static final String OZONE_SCM_DEADNODE_INTERVAL_DEFAULT = public static final String OZONE_SCM_DEADNODE_INTERVAL_DEFAULT =

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
/**
* Ozone security Util class.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class OzoneSecurityUtil {
private OzoneSecurityUtil() {
}
public static boolean isSecurityEnabled(Configuration conf) {
return conf.getBoolean(OZONE_SECURITY_ENABLED_KEY,
OZONE_SECURITY_ENABLED_DEFAULT);
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and unstable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *unstable* .proto interface.
*/
option java_package = "org.apache.hadoop.hdds.protocol.proto";
option java_outer_classname = "SCMSecurityProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdds;
import "hdds.proto";
/**
* Returns a certificate signed by SCM.
*/
message SCMGetDataNodeCertResponseProto {
enum ResponseCode {
success = 1;
authenticationFailed = 2;
invalidCSR = 3;
}
required ResponseCode responseCode = 1;
required string x509Certificate = 2; // Base64 encoded X509 certificate.
}
/**
* This message is send by data node to prove its identity and get an SCM
* signed certificate.
*/
message SCMGetDataNodeCertRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
required string CSR = 2;
}
service SCMSecurityProtocolService {
/**
* Get SCM signed certificate for DataNode.
*/
rpc getDataNodeCertificate (SCMGetDataNodeCertRequestProto) returns (SCMGetDataNodeCertResponseProto);
}

View File

@ -160,6 +160,28 @@ public static InetSocketAddress getScmBlockClientBindAddress(
+ port.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT)); + port.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
} }
/**
* Retrieve the socket address that should be used by scm security server to
* service clients.
*
* @param conf
* @return Target InetSocketAddress for the SCM security service.
*/
public static InetSocketAddress getScmSecurityInetAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_KEY);
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT) +
":" + port
.or(conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_KEY,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT)));
}
/** /**
* Retrieve the socket address that should be used by DataNodes to connect * Retrieve the socket address that should be used by DataNodes to connect
* to the SCM. * to the SCM.

View File

@ -98,7 +98,7 @@ public SCMBlockProtocolServer(OzoneConfiguration conf,
RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class, RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);
// SCM Block Service RPC // SCM Block Service RPC.
BlockingService blockProtoPbService = BlockingService blockProtoPbService =
ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService
.newReflectiveBlockingService( .newReflectiveBlockingService(

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server;
import com.google.protobuf.BlockingService;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.KerberosInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The protocol used to perform security related operations with SCM.
*/
@KerberosInfo(
serverPrincipal = ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
@InterfaceAudience.Private
public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
private static final Logger LOGGER = LoggerFactory
.getLogger(SCMClientProtocolServer.class);
private final OzoneConfiguration config;
private final StorageContainerManager scm;
private final RPC.Server rpcServer;
private final InetSocketAddress rpcAddress;
SCMSecurityProtocolServer(OzoneConfiguration conf,
StorageContainerManager scm) throws IOException {
this.config = conf;
this.scm = scm;
final int handlerCount =
conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_KEY,
ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_DEFAULT);
rpcAddress = HddsServerUtil
.getScmSecurityInetAddress(conf);
// SCM security service RPC service.
BlockingService secureProtoPbService =
SCMSecurityProtocolProtos.SCMSecurityProtocolService
.newReflectiveBlockingService(
new SCMSecurityProtocolServerSideTranslatorPB(this));
this.rpcServer =
StorageContainerManager.startRpcServer(
conf,
rpcAddress,
SCMSecurityProtocolPB.class,
secureProtoPbService,
handlerCount);
}
/**
* Get SCM signed certificate for DataNode.
*
* @param dnDetails - DataNode Details.
* @param certSignReq - Certificate signing request.
* @return byte[] - SCM signed certificate.
*/
@Override
public String getDataNodeCertificate(
DatanodeDetailsProto dnDetails,
String certSignReq) throws IOException {
LOGGER.info("Processing CSR for dn {}, UUID: {}", dnDetails.getHostName(),
dnDetails.getUuid());
// TODO: Call scm to sign the csr.
return null;
}
public RPC.Server getRpcServer() {
return rpcServer;
}
public InetSocketAddress getRpcAddress() {
return rpcAddress;
}
public void start() {
LOGGER.info(StorageContainerManager.buildRpcServerStartMessage("Starting"
+ " RPC server for SCMSecurityProtocolServer.", getRpcAddress()));
getRpcServer().start();
}
public void stop() {
try {
LOGGER.info("Stopping the SCMSecurityProtocolServer.");
getRpcServer().stop();
} catch (Exception ex) {
LOGGER.error("SCMSecurityProtocolServer stop failed.", ex);
}
}
public void join() throws InterruptedException {
LOGGER.trace("Join RPC server for SCMSecurityProtocolServer.");
getRpcServer().join();
}
}

View File

@ -74,6 +74,7 @@
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher; import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -108,8 +109,6 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
@ -157,6 +156,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final SCMDatanodeProtocolServer datanodeProtocolServer; private final SCMDatanodeProtocolServer datanodeProtocolServer;
private final SCMBlockProtocolServer blockProtocolServer; private final SCMBlockProtocolServer blockProtocolServer;
private final SCMClientProtocolServer clientProtocolServer; private final SCMClientProtocolServer clientProtocolServer;
private final SCMSecurityProtocolServer securityProtocolServer;
/* /*
* State Managers of SCM. * State Managers of SCM.
@ -210,8 +210,7 @@ private StorageContainerManager(OzoneConfiguration conf)
StorageContainerManager.initMetrics(); StorageContainerManager.initMetrics();
initContainerReportCache(conf); initContainerReportCache(conf);
// Authenticate SCM if security is enabled // Authenticate SCM if security is enabled
if (conf.getBoolean(OZONE_SECURITY_ENABLED_KEY, if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
OZONE_SECURITY_ENABLED_DEFAULT)) {
loginAsSCMUser(conf); loginAsSCMUser(conf);
} }
@ -294,6 +293,11 @@ private StorageContainerManager(OzoneConfiguration conf)
eventQueue); eventQueue);
blockProtocolServer = new SCMBlockProtocolServer(conf, this); blockProtocolServer = new SCMBlockProtocolServer(conf, this);
clientProtocolServer = new SCMClientProtocolServer(conf, this); clientProtocolServer = new SCMClientProtocolServer(conf, this);
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
securityProtocolServer = new SCMSecurityProtocolServer(conf, this);
} else {
securityProtocolServer = null;
}
httpServer = new StorageContainerManagerHttpServer(conf); httpServer = new StorageContainerManagerHttpServer(conf);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
@ -626,6 +630,10 @@ public SCMClientProtocolServer getClientProtocolServer() {
return clientProtocolServer; return clientProtocolServer;
} }
public SCMSecurityProtocolServer getSecurityProtocolServer() {
return securityProtocolServer;
}
/** /**
* Initialize container reports cache that sent from datanodes. * Initialize container reports cache that sent from datanodes.
* *
@ -728,6 +736,9 @@ public void start() throws IOException {
LOG.info(buildRpcServerStartMessage("ScmDatanodeProtocl RPC " + LOG.info(buildRpcServerStartMessage("ScmDatanodeProtocl RPC " +
"server", getDatanodeProtocolServer().getDatanodeRpcAddress())); "server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
getDatanodeProtocolServer().start(); getDatanodeProtocolServer().start();
if(getSecurityProtocolServer() != null) {
getSecurityProtocolServer().start();
}
httpServer.start(); httpServer.start();
scmBlockManager.start(); scmBlockManager.start();
@ -798,6 +809,10 @@ public void stop() {
LOG.error("Storage Container Manager HTTP server stop failed.", ex); LOG.error("Storage Container Manager HTTP server stop failed.", ex);
} }
if (getSecurityProtocolServer() != null) {
getSecurityProtocolServer().stop();
}
try { try {
LOG.info("Stopping Block Manager Service."); LOG.info("Stopping Block Manager Service.");
scmBlockManager.stop(); scmBlockManager.stop();
@ -838,6 +853,9 @@ public void join() {
getBlockProtocolServer().join(); getBlockProtocolServer().join();
getClientProtocolServer().join(); getClientProtocolServer().join();
getDatanodeProtocolServer().join(); getDatanodeProtocolServer().join();
if(getSecurityProtocolServer() != null) {
getSecurityProtocolServer().join();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOG.info("Interrupted during StorageContainerManager join."); LOG.info("Interrupted during StorageContainerManager join.");