HDDS-2073. Make SCMSecurityProtocol message based.
Contributed by Elek, Marton.
This commit is contained in:
parent
e8ae632d4c
commit
ffd4e52725
@ -16,22 +16,29 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdds.protocolPB;
|
package org.apache.hadoop.hdds.protocolPB;
|
||||||
|
|
||||||
import com.google.protobuf.RpcController;
|
|
||||||
import com.google.protobuf.ServiceException;
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCACertificateRequestProto;
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCACertificateRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto.Builder;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest.Builder;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type;
|
||||||
|
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
import static org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
|
import static org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -52,6 +59,28 @@ public class SCMSecurityProtocolClientSideTranslatorPB implements
|
|||||||
this.rpcProxy = rpcProxy;
|
this.rpcProxy = rpcProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to wrap the request and send the message.
|
||||||
|
*/
|
||||||
|
private SCMSecurityResponse submitRequest(
|
||||||
|
SCMSecurityProtocolProtos.Type type,
|
||||||
|
Consumer<Builder> builderConsumer) throws IOException {
|
||||||
|
final SCMSecurityResponse response;
|
||||||
|
try {
|
||||||
|
|
||||||
|
Builder builder = SCMSecurityRequest.newBuilder()
|
||||||
|
.setCmdType(type)
|
||||||
|
.setTraceID(TracingUtil.exportCurrentSpan());
|
||||||
|
builderConsumer.accept(builder);
|
||||||
|
SCMSecurityRequest wrapper = builder.build();
|
||||||
|
|
||||||
|
response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
|
||||||
|
} catch (ServiceException ex) {
|
||||||
|
throw ProtobufHelper.getRemoteException(ex);
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes this stream and releases any system resources associated
|
* Closes this stream and releases any system resources associated
|
||||||
* with it. If the stream is already closed then invoking this
|
* with it. If the stream is already closed then invoking this
|
||||||
@ -107,15 +136,14 @@ public class SCMSecurityProtocolClientSideTranslatorPB implements
|
|||||||
public SCMGetCertResponseProto getOMCertChain(
|
public SCMGetCertResponseProto getOMCertChain(
|
||||||
OzoneManagerDetailsProto omDetails, String certSignReq)
|
OzoneManagerDetailsProto omDetails, String certSignReq)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
SCMGetOMCertRequestProto.Builder builder = SCMGetOMCertRequestProto
|
SCMGetOMCertRequestProto request = SCMGetOMCertRequestProto
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
.setCSR(certSignReq)
|
.setCSR(certSignReq)
|
||||||
.setOmDetails(omDetails);
|
.setOmDetails(omDetails)
|
||||||
try {
|
.build();
|
||||||
return rpcProxy.getOMCertificate(NULL_RPC_CONTROLLER, builder.build());
|
return submitRequest(Type.GetOMCertificate,
|
||||||
} catch (ServiceException e) {
|
builder -> builder.setGetOMCertRequest(request))
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
.getGetCertResponseProto();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -127,15 +155,14 @@ public class SCMSecurityProtocolClientSideTranslatorPB implements
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String getCertificate(String certSerialId) throws IOException {
|
public String getCertificate(String certSerialId) throws IOException {
|
||||||
Builder builder = SCMGetCertificateRequestProto
|
SCMGetCertificateRequestProto request = SCMGetCertificateRequestProto
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
.setCertSerialId(certSerialId);
|
.setCertSerialId(certSerialId)
|
||||||
try {
|
.build();
|
||||||
return rpcProxy.getCertificate(NULL_RPC_CONTROLLER, builder.build())
|
return submitRequest(Type.GetCertificate,
|
||||||
|
builder -> builder.setGetCertificateRequest(request))
|
||||||
|
.getGetCertResponseProto()
|
||||||
.getX509Certificate();
|
.getX509Certificate();
|
||||||
} catch (ServiceException e) {
|
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -148,16 +175,15 @@ public class SCMSecurityProtocolClientSideTranslatorPB implements
|
|||||||
public SCMGetCertResponseProto getDataNodeCertificateChain(
|
public SCMGetCertResponseProto getDataNodeCertificateChain(
|
||||||
DatanodeDetailsProto dnDetails, String certSignReq)
|
DatanodeDetailsProto dnDetails, String certSignReq)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
SCMGetDataNodeCertRequestProto.Builder builder =
|
|
||||||
|
SCMGetDataNodeCertRequestProto request =
|
||||||
SCMGetDataNodeCertRequestProto.newBuilder()
|
SCMGetDataNodeCertRequestProto.newBuilder()
|
||||||
.setCSR(certSignReq)
|
.setCSR(certSignReq)
|
||||||
.setDatanodeDetails(dnDetails);
|
.setDatanodeDetails(dnDetails)
|
||||||
try {
|
.build();
|
||||||
return rpcProxy.getDataNodeCertificate(NULL_RPC_CONTROLLER,
|
return submitRequest(Type.GetDataNodeCertificate,
|
||||||
builder.build());
|
builder -> builder.setGetDataNodeCertRequest(request))
|
||||||
} catch (ServiceException e) {
|
.getGetCertResponseProto();
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -169,12 +195,10 @@ public class SCMSecurityProtocolClientSideTranslatorPB implements
|
|||||||
public String getCACertificate() throws IOException {
|
public String getCACertificate() throws IOException {
|
||||||
SCMGetCACertificateRequestProto protoIns = SCMGetCACertificateRequestProto
|
SCMGetCACertificateRequestProto protoIns = SCMGetCACertificateRequestProto
|
||||||
.getDefaultInstance();
|
.getDefaultInstance();
|
||||||
try {
|
return submitRequest(Type.GetCACertificate,
|
||||||
return rpcProxy.getCACertificate(NULL_RPC_CONTROLLER, protoIns)
|
builder -> builder.setGetCACertificateRequest(protoIns))
|
||||||
.getX509Certificate();
|
.getGetCertResponseProto().getX509Certificate();
|
||||||
} catch (ServiceException e) {
|
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1,132 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with this
|
|
||||||
* work for additional information regarding copyright ownership. The ASF
|
|
||||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations under
|
|
||||||
* the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdds.protocolPB;
|
|
||||||
|
|
||||||
import com.google.protobuf.RpcController;
|
|
||||||
import com.google.protobuf.ServiceException;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto.ResponseCode;
|
|
||||||
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class is the server-side translator that forwards requests received on
|
|
||||||
* {@link SCMSecurityProtocolPB} to the {@link
|
|
||||||
* SCMSecurityProtocol} server implementation.
|
|
||||||
*/
|
|
||||||
public class SCMSecurityProtocolServerSideTranslatorPB implements
|
|
||||||
SCMSecurityProtocolPB {
|
|
||||||
|
|
||||||
private final SCMSecurityProtocol impl;
|
|
||||||
|
|
||||||
public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl) {
|
|
||||||
this.impl = impl;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get SCM signed certificate for DataNode.
|
|
||||||
*
|
|
||||||
* @param controller
|
|
||||||
* @param request
|
|
||||||
* @return SCMGetDataNodeCertResponseProto.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public SCMGetCertResponseProto getDataNodeCertificate(
|
|
||||||
RpcController controller, SCMGetDataNodeCertRequestProto request)
|
|
||||||
throws ServiceException {
|
|
||||||
try {
|
|
||||||
String certificate = impl
|
|
||||||
.getDataNodeCertificate(request.getDatanodeDetails(),
|
|
||||||
request.getCSR());
|
|
||||||
SCMGetCertResponseProto.Builder builder =
|
|
||||||
SCMGetCertResponseProto
|
|
||||||
.newBuilder()
|
|
||||||
.setResponseCode(ResponseCode.success)
|
|
||||||
.setX509Certificate(certificate)
|
|
||||||
.setX509CACertificate(impl.getCACertificate());
|
|
||||||
|
|
||||||
return builder.build();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ServiceException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get SCM signed certificate for OzoneManager.
|
|
||||||
*
|
|
||||||
* @param controller
|
|
||||||
* @param request
|
|
||||||
* @return SCMGetCertResponseProto.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public SCMGetCertResponseProto getOMCertificate(
|
|
||||||
RpcController controller, SCMGetOMCertRequestProto request)
|
|
||||||
throws ServiceException {
|
|
||||||
try {
|
|
||||||
String certificate = impl
|
|
||||||
.getOMCertificate(request.getOmDetails(),
|
|
||||||
request.getCSR());
|
|
||||||
SCMGetCertResponseProto.Builder builder =
|
|
||||||
SCMGetCertResponseProto
|
|
||||||
.newBuilder()
|
|
||||||
.setResponseCode(ResponseCode.success)
|
|
||||||
.setX509Certificate(certificate)
|
|
||||||
.setX509CACertificate(impl.getCACertificate());
|
|
||||||
return builder.build();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ServiceException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SCMGetCertResponseProto getCertificate(RpcController controller,
|
|
||||||
SCMGetCertificateRequestProto request) throws ServiceException {
|
|
||||||
try {
|
|
||||||
String certificate = impl.getCertificate(request.getCertSerialId());
|
|
||||||
SCMGetCertResponseProto.Builder builder =
|
|
||||||
SCMGetCertResponseProto
|
|
||||||
.newBuilder()
|
|
||||||
.setResponseCode(ResponseCode.success)
|
|
||||||
.setX509Certificate(certificate);
|
|
||||||
return builder.build();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ServiceException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SCMGetCertResponseProto getCACertificate(RpcController controller,
|
|
||||||
SCMSecurityProtocolProtos.SCMGetCACertificateRequestProto request)
|
|
||||||
throws ServiceException {
|
|
||||||
try {
|
|
||||||
String certificate = impl.getCACertificate();
|
|
||||||
SCMGetCertResponseProto.Builder builder =
|
|
||||||
SCMGetCertResponseProto
|
|
||||||
.newBuilder()
|
|
||||||
.setResponseCode(ResponseCode.success)
|
|
||||||
.setX509Certificate(certificate);
|
|
||||||
return builder.build();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ServiceException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -30,10 +30,54 @@ option java_generic_services = true;
|
|||||||
|
|
||||||
option java_generate_equals_and_hash = true;
|
option java_generate_equals_and_hash = true;
|
||||||
|
|
||||||
package hadoop.hdds;
|
package hadoop.hdds.security;
|
||||||
|
|
||||||
import "hdds.proto";
|
import "hdds.proto";
|
||||||
|
|
||||||
|
/**
|
||||||
|
All commands is send as request and all response come back via
|
||||||
|
Response class. If adding new functions please follow this protocol, since
|
||||||
|
our tracing and visibility tools depend on this pattern.
|
||||||
|
*/
|
||||||
|
message SCMSecurityRequest {
|
||||||
|
required Type cmdType = 1; // Type of the command
|
||||||
|
|
||||||
|
optional string traceID = 2;
|
||||||
|
|
||||||
|
optional SCMGetDataNodeCertRequestProto getDataNodeCertRequest = 3;
|
||||||
|
optional SCMGetOMCertRequestProto getOMCertRequest = 4;
|
||||||
|
optional SCMGetCertificateRequestProto getCertificateRequest = 5;
|
||||||
|
optional SCMGetCACertificateRequestProto getCACertificateRequest = 6;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
message SCMSecurityResponse {
|
||||||
|
required Type cmdType = 1; // Type of the command
|
||||||
|
|
||||||
|
// A string that identifies this command, we generate Trace ID in Ozone
|
||||||
|
// frontend and this allows us to trace that command all over ozone.
|
||||||
|
optional string traceID = 2;
|
||||||
|
|
||||||
|
optional bool success = 3 [default = true];
|
||||||
|
|
||||||
|
optional string message = 4;
|
||||||
|
|
||||||
|
required Status status = 5;
|
||||||
|
|
||||||
|
optional SCMGetCertResponseProto getCertResponseProto = 6;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Type {
|
||||||
|
GetDataNodeCertificate = 1;
|
||||||
|
GetOMCertificate = 2;
|
||||||
|
GetCertificate = 3;
|
||||||
|
GetCACertificate = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Status {
|
||||||
|
OK = 1;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* This message is send by data node to prove its identity and get an SCM
|
* This message is send by data node to prove its identity and get an SCM
|
||||||
* signed certificate.
|
* signed certificate.
|
||||||
@ -81,27 +125,5 @@ message SCMGetCertResponseProto {
|
|||||||
|
|
||||||
|
|
||||||
service SCMSecurityProtocolService {
|
service SCMSecurityProtocolService {
|
||||||
/**
|
rpc submitRequest (SCMSecurityRequest) returns (SCMSecurityResponse);
|
||||||
* Get SCM signed certificate for DataNode.
|
|
||||||
*/
|
|
||||||
rpc getDataNodeCertificate (SCMGetDataNodeCertRequestProto) returns
|
|
||||||
(SCMGetCertResponseProto);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get SCM signed certificate for DataNode.
|
|
||||||
*/
|
|
||||||
rpc getOMCertificate (SCMGetOMCertRequestProto) returns
|
|
||||||
(SCMGetCertResponseProto);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get SCM signed certificate for DataNode.
|
|
||||||
*/
|
|
||||||
rpc getCertificate (SCMGetCertificateRequestProto) returns
|
|
||||||
(SCMGetCertResponseProto);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get SCM signed certificate for DataNode.
|
|
||||||
*/
|
|
||||||
rpc getCACertificate (SCMGetCACertificateRequestProto) returns
|
|
||||||
(SCMGetCertResponseProto);
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,186 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdds.scm.protocol;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto.ResponseCode;
|
||||||
|
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Status;
|
||||||
|
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
|
||||||
|
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
|
||||||
|
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is the server-side translator that forwards requests received on
|
||||||
|
* {@link SCMSecurityProtocolPB} to the {@link
|
||||||
|
* SCMSecurityProtocol} server implementation.
|
||||||
|
*/
|
||||||
|
public class SCMSecurityProtocolServerSideTranslatorPB
|
||||||
|
implements SCMSecurityProtocolPB {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(SCMSecurityProtocolServerSideTranslatorPB.class);
|
||||||
|
|
||||||
|
private final SCMSecurityProtocol impl;
|
||||||
|
|
||||||
|
private OzoneProtocolMessageDispatcher<SCMSecurityRequest,
|
||||||
|
SCMSecurityResponse>
|
||||||
|
dispatcher;
|
||||||
|
|
||||||
|
public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl,
|
||||||
|
ProtocolMessageMetrics messageMetrics) {
|
||||||
|
this.impl = impl;
|
||||||
|
this.dispatcher =
|
||||||
|
new OzoneProtocolMessageDispatcher<>("ScmSecurityProtocol",
|
||||||
|
messageMetrics, LOG);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SCMSecurityResponse submitRequest(RpcController controller,
|
||||||
|
SCMSecurityRequest request) throws ServiceException {
|
||||||
|
return dispatcher.processRequest(request, this::processRequest,
|
||||||
|
request.getCmdType(), request.getTraceID());
|
||||||
|
}
|
||||||
|
|
||||||
|
public SCMSecurityResponse processRequest(SCMSecurityRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
switch (request.getCmdType()) {
|
||||||
|
case GetCertificate:
|
||||||
|
return SCMSecurityResponse.newBuilder()
|
||||||
|
.setCmdType(request.getCmdType())
|
||||||
|
.setStatus(Status.OK)
|
||||||
|
.setGetCertResponseProto(
|
||||||
|
getCertificate(request.getGetCertificateRequest()))
|
||||||
|
.build();
|
||||||
|
case GetCACertificate:
|
||||||
|
return SCMSecurityResponse.newBuilder()
|
||||||
|
.setCmdType(request.getCmdType())
|
||||||
|
.setStatus(Status.OK)
|
||||||
|
.setGetCertResponseProto(
|
||||||
|
getCACertificate(request.getGetCACertificateRequest()))
|
||||||
|
.build();
|
||||||
|
case GetOMCertificate:
|
||||||
|
return SCMSecurityResponse.newBuilder()
|
||||||
|
.setCmdType(request.getCmdType())
|
||||||
|
.setStatus(Status.OK)
|
||||||
|
.setGetCertResponseProto(
|
||||||
|
getOMCertificate(request.getGetOMCertRequest()))
|
||||||
|
.build();
|
||||||
|
case GetDataNodeCertificate:
|
||||||
|
return SCMSecurityResponse.newBuilder()
|
||||||
|
.setCmdType(request.getCmdType())
|
||||||
|
.setStatus(Status.OK)
|
||||||
|
.setGetCertResponseProto(
|
||||||
|
getDataNodeCertificate(request.getGetDataNodeCertRequest()))
|
||||||
|
.build();
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Unknown request type: " + request.getCmdType());
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get SCM signed certificate for DataNode.
|
||||||
|
*
|
||||||
|
* @param request
|
||||||
|
* @return SCMGetDataNodeCertResponseProto.
|
||||||
|
*/
|
||||||
|
|
||||||
|
public SCMGetCertResponseProto getDataNodeCertificate(
|
||||||
|
SCMGetDataNodeCertRequestProto request)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
String certificate = impl
|
||||||
|
.getDataNodeCertificate(request.getDatanodeDetails(),
|
||||||
|
request.getCSR());
|
||||||
|
SCMGetCertResponseProto.Builder builder =
|
||||||
|
SCMGetCertResponseProto
|
||||||
|
.newBuilder()
|
||||||
|
.setResponseCode(ResponseCode.success)
|
||||||
|
.setX509Certificate(certificate)
|
||||||
|
.setX509CACertificate(impl.getCACertificate());
|
||||||
|
|
||||||
|
return builder.build();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get SCM signed certificate for OzoneManager.
|
||||||
|
*
|
||||||
|
* @param request
|
||||||
|
* @return SCMGetCertResponseProto.
|
||||||
|
*/
|
||||||
|
public SCMGetCertResponseProto getOMCertificate(
|
||||||
|
SCMGetOMCertRequestProto request) throws IOException {
|
||||||
|
String certificate = impl
|
||||||
|
.getOMCertificate(request.getOmDetails(),
|
||||||
|
request.getCSR());
|
||||||
|
SCMGetCertResponseProto.Builder builder =
|
||||||
|
SCMGetCertResponseProto
|
||||||
|
.newBuilder()
|
||||||
|
.setResponseCode(ResponseCode.success)
|
||||||
|
.setX509Certificate(certificate)
|
||||||
|
.setX509CACertificate(impl.getCACertificate());
|
||||||
|
return builder.build();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public SCMGetCertResponseProto getCertificate(
|
||||||
|
SCMGetCertificateRequestProto request) throws IOException {
|
||||||
|
|
||||||
|
String certificate = impl.getCertificate(request.getCertSerialId());
|
||||||
|
SCMGetCertResponseProto.Builder builder =
|
||||||
|
SCMGetCertResponseProto
|
||||||
|
.newBuilder()
|
||||||
|
.setResponseCode(ResponseCode.success)
|
||||||
|
.setX509Certificate(certificate);
|
||||||
|
return builder.build();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public SCMGetCertResponseProto getCACertificate(
|
||||||
|
SCMSecurityProtocolProtos.SCMGetCACertificateRequestProto request)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
String certificate = impl.getCACertificate();
|
||||||
|
SCMGetCertResponseProto.Builder builder =
|
||||||
|
SCMGetCertResponseProto
|
||||||
|
.newBuilder()
|
||||||
|
.setResponseCode(ResponseCode.success)
|
||||||
|
.setX509Certificate(certificate);
|
||||||
|
return builder.build();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -5,9 +5,9 @@
|
|||||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
* "License"); you may not use this file except in compliance with the License.
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
@ -17,6 +17,7 @@
|
|||||||
package org.apache.hadoop.hdds.scm.server;
|
package org.apache.hadoop.hdds.scm.server;
|
||||||
|
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.cert.CertificateException;
|
import java.security.cert.CertificateException;
|
||||||
@ -32,7 +33,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
|
|||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
|
||||||
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
|
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
|
||||||
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdds.scm.HddsServerUtil;
|
import org.apache.hadoop.hdds.scm.HddsServerUtil;
|
||||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||||
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
|
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
|
||||||
@ -41,7 +42,9 @@ import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateSer
|
|||||||
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
|
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
|
||||||
import org.apache.hadoop.security.KerberosInfo;
|
import org.apache.hadoop.security.KerberosInfo;
|
||||||
|
|
||||||
import org.bouncycastle.cert.X509CertificateHolder;
|
import org.bouncycastle.cert.X509CertificateHolder;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -62,6 +65,7 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
|
|||||||
private final CertificateServer certificateServer;
|
private final CertificateServer certificateServer;
|
||||||
private final RPC.Server rpcServer;
|
private final RPC.Server rpcServer;
|
||||||
private final InetSocketAddress rpcAddress;
|
private final InetSocketAddress rpcAddress;
|
||||||
|
private final ProtocolMessageMetrics metrics;
|
||||||
|
|
||||||
SCMSecurityProtocolServer(OzoneConfiguration conf,
|
SCMSecurityProtocolServer(OzoneConfiguration conf,
|
||||||
CertificateServer certificateServer) throws IOException {
|
CertificateServer certificateServer) throws IOException {
|
||||||
@ -76,10 +80,13 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
|
|||||||
// SCM security service RPC service.
|
// SCM security service RPC service.
|
||||||
RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
|
RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
|
metrics = new ProtocolMessageMetrics("ScmSecurityProtocol",
|
||||||
|
"SCM Security protocol metrics",
|
||||||
|
SCMSecurityProtocolProtos.Type.values());
|
||||||
BlockingService secureProtoPbService =
|
BlockingService secureProtoPbService =
|
||||||
SCMSecurityProtocolProtos.SCMSecurityProtocolService
|
SCMSecurityProtocolProtos.SCMSecurityProtocolService
|
||||||
.newReflectiveBlockingService(
|
.newReflectiveBlockingService(
|
||||||
new SCMSecurityProtocolServerSideTranslatorPB(this));
|
new SCMSecurityProtocolServerSideTranslatorPB(this, metrics));
|
||||||
this.rpcServer =
|
this.rpcServer =
|
||||||
StorageContainerManager.startRpcServer(
|
StorageContainerManager.startRpcServer(
|
||||||
conf,
|
conf,
|
||||||
@ -196,12 +203,14 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
|
|||||||
public void start() {
|
public void start() {
|
||||||
LOGGER.info(StorageContainerManager.buildRpcServerStartMessage("Starting"
|
LOGGER.info(StorageContainerManager.buildRpcServerStartMessage("Starting"
|
||||||
+ " RPC server for SCMSecurityProtocolServer.", getRpcAddress()));
|
+ " RPC server for SCMSecurityProtocolServer.", getRpcAddress()));
|
||||||
|
metrics.register();
|
||||||
getRpcServer().start();
|
getRpcServer().start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
try {
|
try {
|
||||||
LOGGER.info("Stopping the SCMSecurityProtocolServer.");
|
LOGGER.info("Stopping the SCMSecurityProtocolServer.");
|
||||||
|
metrics.unregister();
|
||||||
getRpcServer().stop();
|
getRpcServer().stop();
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOGGER.error("SCMSecurityProtocolServer stop failed.", ex);
|
LOGGER.error("SCMSecurityProtocolServer stop failed.", ex);
|
||||||
|
@ -31,7 +31,7 @@ import org.apache.hadoop.ozone.insight.scm.EventQueueInsight;
|
|||||||
import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight;
|
import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight;
|
||||||
import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight;
|
import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight;
|
||||||
import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight;
|
import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight;
|
||||||
import org.apache.hadoop.ozone.insight.scm.ScmProtocolDatanodeInsight;
|
import org.apache.hadoop.ozone.insight.scm.ScmProtocolSecurityInsight;
|
||||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||||
|
|
||||||
import picocli.CommandLine;
|
import picocli.CommandLine;
|
||||||
@ -89,8 +89,8 @@ public class BaseInsightSubCommand {
|
|||||||
insights.put("scm.event-queue", new EventQueueInsight());
|
insights.put("scm.event-queue", new EventQueueInsight());
|
||||||
insights.put("scm.protocol.block-location",
|
insights.put("scm.protocol.block-location",
|
||||||
new ScmProtocolBlockLocationInsight());
|
new ScmProtocolBlockLocationInsight());
|
||||||
insights.put("scm.protocol.datanode",
|
insights.put("scm.protocol.security",
|
||||||
new ScmProtocolDatanodeInsight());
|
new ScmProtocolSecurityInsight());
|
||||||
insights.put("om.key-manager", new KeyManagerInsight());
|
insights.put("om.key-manager", new KeyManagerInsight());
|
||||||
insights.put("om.protocol.client", new OmProtocolInsight());
|
insights.put("om.protocol.client", new OmProtocolInsight());
|
||||||
|
|
||||||
|
@ -0,0 +1,71 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.insight.scm;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
|
||||||
|
import org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdds.scm.server.SCMSecurityProtocolServer;
|
||||||
|
import org.apache.hadoop.ozone.insight.BaseInsightPoint;
|
||||||
|
import org.apache.hadoop.ozone.insight.Component.Type;
|
||||||
|
import org.apache.hadoop.ozone.insight.LoggerSource;
|
||||||
|
import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insight metric to check the SCM block location protocol behaviour.
|
||||||
|
*/
|
||||||
|
public class ScmProtocolSecurityInsight extends BaseInsightPoint {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<LoggerSource> getRelatedLoggers(boolean verbose) {
|
||||||
|
List<LoggerSource> loggers = new ArrayList<>();
|
||||||
|
loggers.add(
|
||||||
|
new LoggerSource(Type.SCM,
|
||||||
|
SCMSecurityProtocolServerSideTranslatorPB.class,
|
||||||
|
defaultLevel(verbose)));
|
||||||
|
new LoggerSource(Type.SCM,
|
||||||
|
SCMSecurityProtocolServer.class,
|
||||||
|
defaultLevel(verbose));
|
||||||
|
return loggers;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<MetricGroupDisplay> getMetrics() {
|
||||||
|
List<MetricGroupDisplay> metrics = new ArrayList<>();
|
||||||
|
|
||||||
|
Map<String, String> filter = new HashMap<>();
|
||||||
|
filter.put("servername", "SCMSecurityProtocolService");
|
||||||
|
|
||||||
|
addRpcMetrics(metrics, Type.SCM, filter);
|
||||||
|
|
||||||
|
addProtocolMessageMetrics(metrics, "scm_security_protocol",
|
||||||
|
Type.SCM, SCMSecurityProtocolProtos.Type.values());
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return "SCM Block location protocol endpoint";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user