HDDS-805. Block token: Client api changes for block token. Contributed by Ajay Kumar.

This commit is contained in:
Ajay Kumar 2018-12-19 16:34:51 -08:00 committed by Xiaoyu Yao
parent 2b115222cd
commit 50c4045fde
40 changed files with 1332 additions and 349 deletions

View File

@ -32,15 +32,10 @@
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.Status;
@ -53,7 +48,6 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -76,6 +70,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private Map<UUID, ManagedChannel> channels;
private final Semaphore semaphore;
private boolean closed = false;
private SecurityConfig secConfig;
/**
* Constructs a client that can communicate with the Container framework on
@ -90,6 +85,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
Preconditions.checkNotNull(config);
this.pipeline = pipeline;
this.config = config;
this.secConfig = new SecurityConfig(config);
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
@ -97,17 +93,30 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
this.asyncStubs = new HashMap<>();
}
/**
* To be used when grpc token is not enabled.
* */
@Override
public void connect() throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails dn = this.pipeline.getFirstNode();
// just make a connection to the 1st datanode at the beginning
connectToDatanode(dn);
connectToDatanode(dn, null);
}
/**
* Passed encoded token to GRPC header when security is enabled.
* */
@Override
public void connect(String encodedToken) throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails dn = this.pipeline.getFirstNode();
// just make a connection to the 1st datanode at the beginning
connectToDatanode(dn, encodedToken);
}
private void connectToDatanode(DatanodeDetails dn) throws IOException {
private void connectToDatanode(DatanodeDetails dn, String encodedToken)
throws IOException {
// read port from the data node, on failure use default configured
// port.
int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
@ -119,19 +128,6 @@ private void connectToDatanode(DatanodeDetails dn) throws IOException {
// Add credential context to the client call
String userName = UserGroupInformation.getCurrentUser()
.getShortUserName();
// Add block token if block token (mutual auth) is required but the client
// does not have a mTLS (private key and ca signed certificate)
String encodedToken = null;
SecurityConfig secConfig = new SecurityConfig(config);
if (secConfig.isGrpcBlockTokenEnabled()) {
InetSocketAddress addr = new InetSocketAddress(dn.getIpAddress(), port);
encodedToken = getEncodedBlockToken(addr);
if (encodedToken == null) {
throw new SCMSecurityException("No Block token available to access " +
"service at : " + addr.toString());
}
}
LOG.debug("Connecting to server Port : " + dn.getIpAddress());
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
.getIpAddress(), port).usePlaintext()
@ -166,20 +162,6 @@ private void connectToDatanode(DatanodeDetails dn) throws IOException {
channels.put(dn.getUuid(), channel);
}
private String getEncodedBlockToken(InetSocketAddress addr)
throws IOException{
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
OzoneBlockTokenSelector tokenSelector = new OzoneBlockTokenSelector();
Text service = SecurityUtil.buildTokenService(addr);
Token<OzoneBlockTokenIdentifier> token = tokenSelector.selectToken(
service, ugi.getTokens());
if (token != null) {
token.setService(service);
return token.encodeToUrlString();
}
return null;
}
/**
* Returns if the xceiver client connects to all servers in the pipeline.
*
@ -301,8 +283,9 @@ private XceiverClientAsyncReply sendCommandAsync(
ManagedChannel channel = channels.get(dnId);
// If the channel doesn't exist for this specific datanode or the channel
// is closed, just reconnect
String token = request.getEncodedToken();
if (!isConnected(channel)) {
reconnect(dn);
reconnect(dn, token);
}
final CompletableFuture<ContainerCommandResponseProto> replyFuture =
@ -347,11 +330,11 @@ public void onCompleted() {
return new XceiverClientAsyncReply(replyFuture);
}
private void reconnect(DatanodeDetails dn)
private void reconnect(DatanodeDetails dn, String encodedToken)
throws IOException {
ManagedChannel channel;
try {
connectToDatanode(dn);
connectToDatanode(dn, encodedToken);
channel = channels.get(dn.getUuid());
} catch (Exception e) {
LOG.error("Error while connecting: ", e);

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.Closeable;
import java.io.IOException;
@ -62,6 +64,7 @@ public class XceiverClientManager implements Closeable {
private final boolean useRatis;
private static XceiverClientMetrics metrics;
private boolean isSecurityEnabled;
/**
* Creates a new XceiverClientManager.
*
@ -78,6 +81,7 @@ public XceiverClientManager(Configuration conf) {
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.conf = conf;
this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
this.clientCache = CacheBuilder.newBuilder()
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
.maximumSize(maxSize)
@ -141,14 +145,19 @@ private XceiverClientSpi getClient(Pipeline pipeline)
throws IOException {
HddsProtos.ReplicationType type = pipeline.getType();
try {
return clientCache.get(pipeline.getId().getId().toString() + type,
new Callable<XceiverClientSpi>() {
String key = pipeline.getId().getId().toString() + type;
// Append user short name to key to prevent a different user
// from using same instance of xceiverClient.
key = isSecurityEnabled ?
key + UserGroupInformation.getCurrentUser().getShortUserName() : key;
return clientCache.get(key, new Callable<XceiverClientSpi>() {
@Override
public XceiverClientSpi call() throws Exception {
XceiverClientSpi client = null;
switch (type) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
client.connect();
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf);
@ -157,7 +166,6 @@ public XceiverClientSpi call() throws Exception {
default:
throw new IOException("not implemented" + pipeline.getType());
}
client.connect();
return client;
}
});

View File

@ -150,6 +150,12 @@ public void connect() throws Exception {
}
}
@Override
public void connect(String encodedToken) throws Exception {
throw new UnsupportedOperationException("Block tokens are not " +
"implemented for Ratis clients.");
}
@Override
public void close() {
final RaftClient c = client.getAndSet(null);

View File

@ -115,7 +115,7 @@ public ContainerWithPipeline createContainer(String owner)
public void createContainer(XceiverClientSpi client,
long containerId) throws IOException {
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls.createContainer(client, containerId, traceID);
ContainerProtocolCalls.createContainer(client, containerId, traceID, null);
// Let us log this info after we let SCM know that we have completed the
// creation state.
@ -260,7 +260,7 @@ public void deleteContainer(long containerId, Pipeline pipeline,
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls
.deleteContainer(client, containerId, force, traceID);
.deleteContainer(client, containerId, force, traceID, null);
storageContainerLocationClient
.deleteContainer(containerId);
if (LOG.isDebugEnabled()) {
@ -310,7 +310,8 @@ public ContainerDataProto readContainer(long containerID,
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ReadContainerResponseProto response =
ContainerProtocolCalls.readContainer(client, containerID, traceID);
ContainerProtocolCalls.readContainer(client, containerID, traceID,
null);
if (LOG.isDebugEnabled()) {
LOG.debug("Read container {}, machines: {} ", containerID,
pipeline.getNodes());
@ -401,7 +402,8 @@ public void closeContainer(long containerId, Pipeline pipeline)
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.closeContainer(client, containerId, traceID);
ContainerProtocolCalls.closeContainer(client, containerId, traceID,
null);
// Notify SCM to close the container
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,

View File

@ -111,6 +111,10 @@ public final class HddsConfigKeys {
public static final String HDDS_PUBLIC_KEY_FILE_NAME = "hdds.public.key.file"
+ ".name";
public static final String HDDS_PUBLIC_KEY_FILE_NAME_DEFAULT = "public.pem";
public static final String HDDS_BLOCK_TOKEN_EXPIRY_TIME =
"hdds.block.token.expiry.time";
public static final String HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT = "1d";
/**
* Maximum duration of certificates issued by SCM including Self-Signed Roots.
* The formats accepted are based on the ISO-8601 duration format PnDTnHnMn.nS

View File

@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdds.client;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -75,10 +74,9 @@ public void setContainerBlockID(ContainerBlockID containerBlockID) {
@Override
public String toString() {
return new ToStringBuilder(this)
.append("containerID", containerBlockID.getContainerID())
.append("localID", containerBlockID.getLocalID())
.append("blockCommitSequenceId", blockCommitSequenceId)
return new StringBuffer().append(getContainerBlockID().toString())
.append(" bcId: ")
.append(blockCommitSequenceId)
.toString();
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdds.client;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.util.Objects;
@ -43,10 +42,11 @@ public long getLocalID() {
@Override
public String toString() {
return new ToStringBuilder(this).
append("containerID", containerID).
append("localID", localID).
toString();
return new StringBuffer()
.append("conID: ")
.append(containerID)
.append(" locID: ")
.append(localID).toString();
}
public HddsProtos.ContainerBlockID getProtobuf() {

View File

@ -78,6 +78,12 @@ public int getRefcount() {
*/
public abstract void connect() throws Exception;
/**
* Connects to the leader in the pipeline using encoded token. To be used
* in a secure cluster.
*/
public abstract void connect(String encodedToken) throws Exception;
@Override
public abstract void close();

View File

@ -21,8 +21,13 @@
import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
import org.apache.hadoop.hdds.scm.container.common.helpers
.BlockNotCommittedException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers
@ -97,14 +102,19 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
.setBlockID(datanodeBlockID);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetBlock)
.setContainerID(datanodeBlockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setGetBlock(readBlockRequest)
.build();
.setGetBlock(readBlockRequest);
String encodedToken = getEncodedBlockToken(getService(datanodeBlockID));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response);
@ -129,13 +139,19 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder().
setBlockID(blockID.getDatanodeBlockIDProtobuf());
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request =
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.GetCommittedBlockLength)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setGetCommittedBlockLength(getBlockLengthRequestBuilder).build();
.setGetCommittedBlockLength(getBlockLengthRequestBuilder);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response);
return response.getGetCommittedBlockLength();
@ -156,11 +172,17 @@ public static ContainerProtos.PutBlockResponseProto putBlock(
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request =
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setTraceID(traceID).setDatanodeUuid(id)
.setPutBlock(createBlockRequest).build();
.setPutBlock(createBlockRequest);
String encodedToken =
getEncodedBlockToken(getService(containerBlockData.getBlockID()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response);
return response.getPutBlock();
@ -184,11 +206,18 @@ public static XceiverClientAsyncReply putBlockAsync(
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request =
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setTraceID(traceID).setDatanodeUuid(id)
.setPutBlock(createBlockRequest).build();
.setPutBlock(createBlockRequest);
String encodedToken =
getEncodedBlockToken(getService(containerBlockData.getBlockID()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
xceiverClient.sendCommand(request);
return xceiverClient.sendCommandAsync(request);
}
@ -209,14 +238,19 @@ public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient,
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.ReadChunk)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setReadChunk(readChunkRequest)
.build();
.setReadChunk(readChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response);
return response.getReadChunk();
@ -241,14 +275,19 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
.setChunkData(chunk)
.setData(data);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setWriteChunk(writeChunkRequest)
.build();
.setWriteChunk(writeChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response);
}
@ -272,10 +311,16 @@ public static XceiverClientAsyncReply writeChunkAsync(
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk).setData(data);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request =
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID()).setTraceID(traceID)
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest).build();
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
return xceiverClient.sendCommandAsync(request);
}
@ -293,8 +338,8 @@ public static XceiverClientAsyncReply writeChunkAsync(
* @throws IOException
*/
public static PutSmallFileResponseProto writeSmallFile(
XceiverClientSpi client, BlockID blockID, byte[] data, String traceID)
throws IOException {
XceiverClientSpi client, BlockID blockID, byte[] data,
String traceID) throws IOException {
BlockData containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
@ -323,14 +368,19 @@ public static PutSmallFileResponseProto writeSmallFile(
.build();
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request =
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.PutSmallFile)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setPutSmallFile(putSmallFileRequest)
.build();
.setPutSmallFile(putSmallFileRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response = client.sendCommand(request);
validateContainerResponse(response);
return response.getPutSmallFile();
@ -341,10 +391,11 @@ public static PutSmallFileResponseProto writeSmallFile(
* @param client - client
* @param containerID - ID of container
* @param traceID - traceID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void createContainer(XceiverClientSpi client, long containerID,
String traceID) throws IOException {
String traceID, String encodedToken) throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
.newBuilder();
@ -354,6 +405,9 @@ public static void createContainer(XceiverClientSpi client, long containerID,
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setContainerID(containerID);
request.setCreateContainer(createRequest.build());
@ -370,10 +424,11 @@ public static void createContainer(XceiverClientSpi client, long containerID,
* @param client
* @param force whether or not to forcibly delete the container.
* @param traceID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void deleteContainer(XceiverClientSpi client, long containerID,
boolean force, String traceID) throws IOException {
boolean force, String traceID, String encodedToken) throws IOException {
ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest =
ContainerProtos.DeleteContainerRequestProto.newBuilder();
deleteRequest.setForceDelete(force);
@ -386,6 +441,9 @@ public static void deleteContainer(XceiverClientSpi client, long containerID,
request.setDeleteContainer(deleteRequest);
request.setTraceID(traceID);
request.setDatanodeUuid(id);
if(encodedToken != null) {
request.setEncodedToken(encodedToken);
}
ContainerCommandResponseProto response =
client.sendCommand(request.build());
validateContainerResponse(response);
@ -397,10 +455,12 @@ public static void deleteContainer(XceiverClientSpi client, long containerID,
* @param client
* @param containerID
* @param traceID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void closeContainer(XceiverClientSpi client,
long containerID, String traceID) throws IOException {
long containerID, String traceID, String encodedToken)
throws IOException {
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
@ -410,6 +470,9 @@ public static void closeContainer(XceiverClientSpi client,
request.setCloseContainer(CloseContainerRequestProto.getDefaultInstance());
request.setTraceID(traceID);
request.setDatanodeUuid(id);
if(encodedToken != null) {
request.setEncodedToken(encodedToken);
}
ContainerCommandResponseProto response =
client.sendCommand(request.build());
validateContainerResponse(response);
@ -420,11 +483,12 @@ public static void closeContainer(XceiverClientSpi client,
*
* @param client - client
* @param traceID - trace ID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static ReadContainerResponseProto readContainer(
XceiverClientSpi client, long containerID,
String traceID) throws IOException {
String traceID, String encodedToken) throws IOException {
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
@ -434,6 +498,9 @@ public static ReadContainerResponseProto readContainer(
request.setReadContainer(ReadContainerRequestProto.getDefaultInstance());
request.setDatanodeUuid(id);
request.setTraceID(traceID);
if(encodedToken != null) {
request.setEncodedToken(encodedToken);
}
ContainerCommandResponseProto response =
client.sendCommand(request.build());
validateContainerResponse(response);
@ -461,14 +528,19 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
.build();
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetSmallFile)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setGetSmallFile(getSmallFileRequest)
.build();
.setGetSmallFile(getSmallFileRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response = client.sendCommand(request);
validateContainerResponse(response);
@ -494,4 +566,30 @@ public static void validateContainerResponse(
throw new StorageContainerException(
response.getMessage(), response.getResult());
}
/**
* Returns a url encoded block token. Service param should match the service
* field of token.
* @param service
*
* */
private static String getEncodedBlockToken(Text service)
throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Token<OzoneBlockTokenIdentifier> token =
OzoneBlockTokenSelector.selectBlockToken(service, ugi.getTokens());
if (token != null) {
return token.encodeToUrlString();
}
return null;
}
private static Text getService(DatanodeBlockID blockId) {
return new Text(new StringBuffer()
.append("conID: ")
.append(blockId.getContainerID())
.append(" locID: ")
.append(blockId.getLocalID())
.toString());
}
}

View File

@ -39,6 +39,7 @@ public class BlockTokenVerifier implements TokenVerifier {
private final CertificateClient caClient;
private final SecurityConfig conf;
private static boolean testStub = false;
public BlockTokenVerifier(SecurityConfig conf, CertificateClient caClient) {
this.conf = conf;
@ -53,7 +54,7 @@ private boolean isExpired(long expiryDate) {
public UserGroupInformation verify(String user, String tokenStr)
throws SCMSecurityException {
if (conf.isGrpcBlockTokenEnabled()) {
if (Strings.isNullOrEmpty(tokenStr)) {
if (Strings.isNullOrEmpty(tokenStr) || isTestStub()) {
throw new BlockTokenException("Fail to find any token (empty or " +
"null.");
}
@ -110,4 +111,13 @@ public UserGroupInformation verify(String user, String tokenStr)
return UserGroupInformation.createRemoteUser(user);
}
}
public static boolean isTestStub() {
return testStub;
}
// For testing purpose only.
public static void setTestStub(boolean isTestStub) {
BlockTokenVerifier.testStub = isTestStub;
}
}

View File

@ -45,7 +45,27 @@ public Token<OzoneBlockTokenIdentifier> selectToken(Text service,
return null;
}
for (Token<? extends TokenIdentifier> token : tokens) {
if (OzoneBlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
if (OzoneBlockTokenIdentifier.KIND_NAME.equals(token.getKind())
&& token.getService().equals(service)) {
LOG.trace("Getting token for service:{}", service);
return (Token<OzoneBlockTokenIdentifier>) token;
}
}
return null;
}
/**
* Static method to avoid instantiation.
* */
@SuppressWarnings("unchecked")
public static Token<OzoneBlockTokenIdentifier> selectBlockToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) {
if (service == null) {
return null;
}
for (Token<? extends TokenIdentifier> token : tokens) {
if (OzoneBlockTokenIdentifier.KIND_NAME.equals(token.getKind())
&& token.getService().equals(service)) {
LOG.trace("Getting token for service:{}", service);
return (Token<OzoneBlockTokenIdentifier>) token;
}

View File

@ -72,6 +72,9 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_SIGNATURE_ALGO;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_SIGNATURE_ALGO_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
/**
* A class that deals with all Security related configs in HDDS.
@ -101,6 +104,7 @@ public class SecurityConfig {
private String trustStoreFileName;
private String serverCertChainFileName;
private String clientCertChainFileName;
private final boolean isSecurityEnabled;
/**
* Constructs a SecurityConfig.
@ -120,8 +124,8 @@ public SecurityConfig(Configuration configuration) {
// HDDS metadata dir and if that is not set, we will use Ozone directory.
// TODO: We might want to fix this later.
this.metadatDir = this.configuration.get(HDDS_METADATA_DIR_NAME,
configuration.get(OZONE_METADATA_DIRS));
configuration.get(OZONE_METADATA_DIRS,
configuration.get(HDDS_DATANODE_DIR_KEY)));
Preconditions.checkNotNull(this.metadatDir, "Metadata directory can't be"
+ " null. Please check configs.");
this.keyDir = this.configuration.get(HDDS_KEY_DIR_NAME,
@ -164,6 +168,10 @@ public SecurityConfig(Configuration configuration) {
HDDS_GRPC_TLS_TEST_CERT, HDDS_GRPC_TLS_TEST_CERT_DEFAULT);
}
this.isSecurityEnabled = this.configuration.getBoolean(
OZONE_SECURITY_ENABLED_KEY,
OZONE_SECURITY_ENABLED_DEFAULT);
// First Startup -- if the provider is null, check for the provider.
if (SecurityConfig.provider == null) {
synchronized (SecurityConfig.class) {
@ -177,6 +185,16 @@ public SecurityConfig(Configuration configuration) {
}
}
/**
* Returns true if security is enabled for OzoneCluster. This is determined
* by value of OZONE_SECURITY_ENABLED_KEY.
*
* @return true if security is enabled for OzoneCluster.
*/
public boolean isSecurityEnabled() {
return isSecurityEnabled;
}
/**
* Returns the Standard Certificate file name.
*

View File

@ -194,6 +194,7 @@ message ContainerCommandRequestProto {
optional PutSmallFileRequestProto putSmallFile = 20;
optional GetSmallFileRequestProto getSmallFile = 21;
optional GetCommittedBlockLengthRequestProto getCommittedBlockLength = 22;
optional string encodedToken = 23;
}
message ContainerCommandResponseProto {

View File

@ -1139,7 +1139,7 @@
<property>
<name>ozone.tags.system</name>
<value>OZONE,MANAGEMENT,SECURITY,PERFORMANCE,DEBUG,CLIENT,SERVER,OM,SCM,
CRITICAL,RATIS,CONTAINER,REQUIRED,REST,STORAGE,PIPELINE,STANDALONE,S3GATEWAY,ACL</value>
CRITICAL,RATIS,CONTAINER,REQUIRED,REST,STORAGE,PIPELINE,STANDALONE,S3GATEWAY,ACL,TOKEN</value>
</property>
@ -1645,6 +1645,18 @@
Directory to store public/private key for SCM CA. This is relative to ozone/hdds meteadata dir.
</description>
</property>
<property>
<name>hdds.block.token.expiry.time</name>
<value>1d</value>
<tag>OZONE, HDDS, SECURITY, TOKEN</tag>
<description>
Default value for expiry time of block token. This
setting supports multiple time unit suffixes as described in
dfs.heartbeat.interval. If no suffix is specified, then milliseconds is
assumed.
</description>
</property>
<property>
<name>hdds.metadata.dir</name>
<value/>

View File

@ -0,0 +1,359 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
/**
* Helper class used inside {@link BlockOutputStream}.
* */
public class BlockOutputStreamEntry extends OutputStream {
private OutputStream outputStream;
private BlockID blockID;
private final String key;
private final XceiverClientManager xceiverClientManager;
private final XceiverClientSpi xceiverClient;
private final Checksum checksum;
private final String requestId;
private final int chunkSize;
// total number of bytes that should be written to this stream
private final long length;
// the current position of this stream 0 <= currentPosition < length
private long currentPosition;
private Token<OzoneBlockTokenIdentifier> token;
private final long streamBufferFlushSize;
private final long streamBufferMaxSize;
private final long watchTimeout;
private List<ByteBuffer> bufferList;
private BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
long length, long streamBufferFlushSize, long streamBufferMaxSize,
long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum,
Token<OzoneBlockTokenIdentifier> token) {
this.outputStream = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.requestId = requestId;
this.chunkSize = chunkSize;
this.token = token;
this.length = length;
this.currentPosition = 0;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
this.bufferList = bufferList;
this.checksum = checksum;
}
/**
* For testing purpose, taking a some random created stream instance.
*
* @param outputStream a existing writable output stream
* @param length the length of data to write to the stream
*/
BlockOutputStreamEntry(OutputStream outputStream, long length,
Checksum checksum) {
this.outputStream = outputStream;
this.blockID = null;
this.key = null;
this.xceiverClientManager = null;
this.xceiverClient = null;
this.requestId = null;
this.chunkSize = -1;
this.token = null;
this.length = length;
this.currentPosition = 0;
streamBufferFlushSize = 0;
streamBufferMaxSize = 0;
bufferList = null;
watchTimeout = 0;
this.checksum = checksum;
}
long getLength() {
return length;
}
Token<OzoneBlockTokenIdentifier> getToken() {
return token;
}
long getRemaining() {
return length - currentPosition;
}
private void checkStream() throws IOException {
if (this.outputStream == null) {
if (getToken() != null) {
UserGroupInformation.getCurrentUser().addToken(getToken());
}
this.outputStream =
new BlockOutputStream(blockID, key, xceiverClientManager,
xceiverClient, requestId, chunkSize, streamBufferFlushSize,
streamBufferMaxSize, watchTimeout, bufferList, checksum);
}
}
@Override
public void write(int b) throws IOException {
checkStream();
outputStream.write(b);
this.currentPosition += 1;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkStream();
outputStream.write(b, off, len);
this.currentPosition += len;
}
@Override
public void flush() throws IOException {
if (this.outputStream != null) {
this.outputStream.flush();
}
}
@Override
public void close() throws IOException {
if (this.outputStream != null) {
this.outputStream.close();
// after closing the chunkOutPutStream, blockId would have been
// reconstructed with updated bcsId
if (this.outputStream instanceof BlockOutputStream) {
this.blockID = ((BlockOutputStream) outputStream).getBlockID();
}
}
}
long getTotalSuccessfulFlushedData() throws IOException {
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
blockID = out.getBlockID();
return out.getTotalSuccessfulFlushedData();
} else if (outputStream == null) {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
throw new IOException("Invalid Output Stream for Key: " + key);
}
long getWrittenDataLength() throws IOException {
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
return out.getWrittenDataLength();
} else if (outputStream == null) {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
throw new IOException("Invalid Output Stream for Key: " + key);
}
void cleanup() throws IOException{
checkStream();
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.cleanup();
}
}
void writeOnRetry(long len) throws IOException {
checkStream();
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.writeOnRetry(len);
this.currentPosition += len;
} else {
throw new IOException("Invalid Output Stream for Key: " + key);
}
}
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
public static class Builder {
private BlockID blockID;
private String key;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private String requestId;
private int chunkSize;
private long length;
private long streamBufferFlushSize;
private long streamBufferMaxSize;
private long watchTimeout;
private List<ByteBuffer> bufferList;
private Token<OzoneBlockTokenIdentifier> token;
private Checksum checksum;
public Builder setChecksum(Checksum cs) {
this.checksum = cs;
return this;
}
public Builder setBlockID(BlockID bID) {
this.blockID = bID;
return this;
}
public Builder setKey(String keys) {
this.key = keys;
return this;
}
public Builder setXceiverClientManager(XceiverClientManager
xClientManager) {
this.xceiverClientManager = xClientManager;
return this;
}
public Builder setXceiverClient(XceiverClientSpi client) {
this.xceiverClient = client;
return this;
}
public Builder setRequestId(String request) {
this.requestId = request;
return this;
}
public Builder setChunkSize(int cSize) {
this.chunkSize = cSize;
return this;
}
public Builder setLength(long len) {
this.length = len;
return this;
}
public Builder setStreamBufferFlushSize(long bufferFlushSize) {
this.streamBufferFlushSize = bufferFlushSize;
return this;
}
public Builder setStreamBufferMaxSize(long bufferMaxSize) {
this.streamBufferMaxSize = bufferMaxSize;
return this;
}
public Builder setWatchTimeout(long timeout) {
this.watchTimeout = timeout;
return this;
}
public Builder setBufferList(List<ByteBuffer> bufferList) {
this.bufferList = bufferList;
return this;
}
public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID, key,
xceiverClientManager, xceiverClient, requestId, chunkSize,
length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
bufferList, checksum, token);
}
}
public OutputStream getOutputStream() {
return outputStream;
}
public BlockID getBlockID() {
return blockID;
}
public String getKey() {
return key;
}
public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}
public XceiverClientSpi getXceiverClient() {
return xceiverClient;
}
public Checksum getChecksum() {
return checksum;
}
public String getRequestId() {
return requestId;
}
public int getChunkSize() {
return chunkSize;
}
public long getCurrentPosition() {
return currentPosition;
}
public long getStreamBufferFlushSize() {
return streamBufferFlushSize;
}
public long getStreamBufferMaxSize() {
return streamBufferMaxSize;
}
public long getWatchTimeout() {
return watchTimeout;
}
public List<ByteBuffer> getBufferList() {
return bufferList;
}
public void setCurrentPosition(long curPosition) {
this.currentPosition = curPosition;
}
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -296,6 +297,10 @@ public static LengthInputStream getFromOmKeyInfo(
groupInputStream.streamOffset[i] = length;
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
if (omKeyLocationInfo.getToken() != null) {
UserGroupInformation.getCurrentUser().
addToken(omKeyLocationInfo.getToken());
}
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, requestId);
List<ContainerProtos.ChunkInfo> chunks =

View File

@ -36,6 +36,7 @@
.StorageContainerException;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -133,12 +134,13 @@ public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
for (BlockOutputStreamEntry streamEntry : streamEntries) {
OmKeyLocationInfo info =
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
.setLength(streamEntry.currentPosition).setOffset(0)
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
.setLength(streamEntry.getCurrentPosition()).setOffset(0)
.setToken(streamEntry.getToken())
.build();
LOG.debug("block written " + streamEntry.blockID + ", length "
+ streamEntry.currentPosition + " bcsID " + streamEntry.blockID
.getBlockCommitSequenceId());
LOG.debug("block written " + streamEntry.getBlockID() + ", length "
+ streamEntry.getCurrentPosition() + " bcsID "
+ streamEntry.getBlockID().getBlockCommitSequenceId());
locationInfoList.add(info);
}
return locationInfoList;
@ -213,15 +215,27 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
throws IOException {
ContainerWithPipeline containerWithPipeline = scmClient
.getContainerWithPipeline(subKeyInfo.getContainerID());
UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
streamEntries.add(new BlockOutputStreamEntry(subKeyInfo.getBlockID(),
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
streamBufferMaxSize, watchTimeout, bufferList, checksum));
BlockOutputStreamEntry.Builder builder =
new BlockOutputStreamEntry.Builder()
.setBlockID(subKeyInfo.getBlockID())
.setKey(keyArgs.getKeyName())
.setXceiverClientManager(xceiverClientManager)
.setXceiverClient(xceiverClient)
.setRequestId(requestID)
.setChunkSize(chunkSize)
.setLength(subKeyInfo.getLength())
.setStreamBufferFlushSize(streamBufferFlushSize)
.setStreamBufferMaxSize(streamBufferMaxSize)
.setWatchTimeout(watchTimeout)
.setBufferList(bufferList)
.setChecksum(checksum)
.setToken(subKeyInfo.getToken());
streamEntries.add(builder.build());
}
@Override
public void write(int b) throws IOException {
byte[] buf = new byte[1];
@ -329,7 +343,7 @@ private void discardPreallocatedBlocks(long containerID) {
ListIterator<BlockOutputStreamEntry> streamEntryIterator =
streamEntries.listIterator(currentStreamIndex);
while (streamEntryIterator.hasNext()) {
if (streamEntryIterator.next().blockID.getContainerID()
if (streamEntryIterator.next().getBlockID().getContainerID()
== containerID) {
streamEntryIterator.remove();
}
@ -348,7 +362,7 @@ private void removeEmptyBlocks() {
ListIterator<BlockOutputStreamEntry> streamEntryIterator =
streamEntries.listIterator(currentStreamIndex);
while (streamEntryIterator.hasNext()) {
if (streamEntryIterator.next().currentPosition == 0) {
if (streamEntryIterator.next().getCurrentPosition() == 0) {
streamEntryIterator.remove();
}
}
@ -369,7 +383,7 @@ private void handleException(BlockOutputStreamEntry streamEntry,
long totalSuccessfulFlushedData =
streamEntry.getTotalSuccessfulFlushedData();
//set the correct length for the current stream
streamEntry.currentPosition = totalSuccessfulFlushedData;
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = computeBufferData();
// just clean up the current stream.
streamEntry.cleanup();
@ -385,7 +399,7 @@ private void handleException(BlockOutputStreamEntry streamEntry,
}
// discard subsequent pre allocated blocks from the streamEntries list
// from the closed container
discardPreallocatedBlocks(streamEntry.blockID.getContainerID());
discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID());
}
private boolean checkIfContainerIsClosed(IOException ioe) {
@ -423,7 +437,7 @@ private boolean checkIfTimeoutException(IOException ioe) {
}
private long getKeyLength() {
return streamEntries.stream().mapToLong(e -> e.currentPosition)
return streamEntries.parallelStream().mapToLong(e -> e.getCurrentPosition())
.sum();
}
@ -638,169 +652,6 @@ public KeyOutputStream build() throws IOException {
}
}
private static class BlockOutputStreamEntry extends OutputStream {
private OutputStream outputStream;
private BlockID blockID;
private final String key;
private final XceiverClientManager xceiverClientManager;
private final XceiverClientSpi xceiverClient;
private final Checksum checksum;
private final String requestId;
private final int chunkSize;
// total number of bytes that should be written to this stream
private final long length;
// the current position of this stream 0 <= currentPosition < length
private long currentPosition;
private final long streamBufferFlushSize;
private final long streamBufferMaxSize;
private final long watchTimeout;
private List<ByteBuffer> bufferList;
@SuppressWarnings("parameternumber")
BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
long length, long streamBufferFlushSize, long streamBufferMaxSize,
long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum) {
this.outputStream = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.requestId = requestId;
this.chunkSize = chunkSize;
this.length = length;
this.currentPosition = 0;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
this.checksum = checksum;
this.bufferList = bufferList;
}
/**
* For testing purpose, taking a some random created stream instance.
* @param outputStream a existing writable output stream
* @param length the length of data to write to the stream
*/
BlockOutputStreamEntry(OutputStream outputStream, long length,
Checksum checksum) {
this.outputStream = outputStream;
this.blockID = null;
this.key = null;
this.xceiverClientManager = null;
this.xceiverClient = null;
this.requestId = null;
this.chunkSize = -1;
this.length = length;
this.currentPosition = 0;
streamBufferFlushSize = 0;
streamBufferMaxSize = 0;
bufferList = null;
watchTimeout = 0;
this.checksum = checksum;
}
long getLength() {
return length;
}
long getRemaining() {
return length - currentPosition;
}
private void checkStream() {
if (this.outputStream == null) {
this.outputStream =
new BlockOutputStream(blockID, key, xceiverClientManager,
xceiverClient, requestId, chunkSize, streamBufferFlushSize,
streamBufferMaxSize, watchTimeout, bufferList, checksum);
}
}
@Override
public void write(int b) throws IOException {
checkStream();
outputStream.write(b);
this.currentPosition += 1;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkStream();
outputStream.write(b, off, len);
this.currentPosition += len;
}
@Override
public void flush() throws IOException {
if (this.outputStream != null) {
this.outputStream.flush();
}
}
@Override
public void close() throws IOException {
if (this.outputStream != null) {
this.outputStream.close();
// after closing the chunkOutPutStream, blockId would have been
// reconstructed with updated bcsId
if (this.outputStream instanceof BlockOutputStream) {
this.blockID = ((BlockOutputStream) outputStream).getBlockID();
}
}
}
long getTotalSuccessfulFlushedData() throws IOException {
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
blockID = out.getBlockID();
return out.getTotalSuccessfulFlushedData();
} else if (outputStream == null) {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
throw new IOException("Invalid Output Stream for Key: " + key);
}
long getWrittenDataLength() throws IOException {
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
return out.getWrittenDataLength();
} else if (outputStream == null) {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
throw new IOException("Invalid Output Stream for Key: " + key);
}
void cleanup() {
checkStream();
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.cleanup();
}
}
void writeOnRetry(long len) throws IOException {
checkStream();
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.writeOnRetry(len);
this.currentPosition += len;
} else {
throw new IOException("Invalid Output Stream for Key: " + key);
}
}
}
/**
* Verify that the output stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.

View File

@ -17,7 +17,9 @@
package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
import org.apache.hadoop.security.token.Token;
/**
* One key can be too huge to fit in one container. In which case it gets split
@ -28,6 +30,8 @@ public final class OmKeyLocationInfo {
// the id of this subkey in all the subkeys.
private long length;
private final long offset;
// Block token, required for client authentication when security is enabled.
private Token<OzoneBlockTokenIdentifier> token;
// the version number indicating when this block was added
private long createVersion;
@ -37,6 +41,14 @@ private OmKeyLocationInfo(BlockID blockID, long length, long offset) {
this.offset = offset;
}
private OmKeyLocationInfo(BlockID blockID, long length, long offset,
Token<OzoneBlockTokenIdentifier> token) {
this.blockID = blockID;
this.length = length;
this.offset = offset;
this.token = token;
}
public void setCreateVersion(long version) {
createVersion = version;
}
@ -73,6 +85,13 @@ public long getBlockCommitSequenceId() {
return blockID.getBlockCommitSequenceId();
}
public Token<OzoneBlockTokenIdentifier> getToken() {
return token;
}
public void setToken(Token<OzoneBlockTokenIdentifier> token) {
this.token = token;
}
/**
* Builder of OmKeyLocationInfo.
*/
@ -80,6 +99,7 @@ public static class Builder {
private BlockID blockID;
private long length;
private long offset;
private Token<OzoneBlockTokenIdentifier> token;
public Builder setBlockID(BlockID blockId) {
this.blockID = blockId;
@ -96,18 +116,30 @@ public Builder setOffset(long off) {
return this;
}
public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}
public OmKeyLocationInfo build() {
if (token == null) {
return new OmKeyLocationInfo(blockID, length, offset);
} else {
return new OmKeyLocationInfo(blockID, length, offset, token);
}
}
}
public KeyLocation getProtobuf() {
return KeyLocation.newBuilder()
KeyLocation.Builder builder = KeyLocation.newBuilder()
.setBlockID(blockID.getProtobuf())
.setLength(length)
.setOffset(offset)
.setCreateVersion(createVersion)
.build();
.setCreateVersion(createVersion);
if (this.token != null) {
builder.setToken(this.token.toTokenProto());
}
return builder.build();
}
public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
@ -115,6 +147,9 @@ public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
BlockID.getFromProtobuf(keyLocation.getBlockID()),
keyLocation.getLength(),
keyLocation.getOffset());
if(keyLocation.hasToken()) {
info.token = new Token<>(keyLocation.getToken());
}
info.setCreateVersion(keyLocation.getCreateVersion());
return info;
}
@ -125,6 +160,7 @@ public String toString() {
", localID=" + blockID.getLocalID() + "}" +
", length=" + length +
", offset=" + offset +
", token=" + token +
", createVersion=" + createVersion + '}';
}
}

View File

@ -19,9 +19,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -55,7 +55,7 @@ public class OzoneBlockTokenSecretManager extends
* @param blockTokenExpirytime token expiry time for expired tokens in
* milliseconds
*/
public OzoneBlockTokenSecretManager(OzoneConfiguration conf,
public OzoneBlockTokenSecretManager(SecurityConfig conf,
long blockTokenExpirytime, String omCertSerialId) {
super(conf, blockTokenExpirytime, blockTokenExpirytime, SERVICE, LOG);
this.omCertSerialId = omCertSerialId;
@ -74,7 +74,8 @@ public OzoneBlockTokenIdentifier createIdentifier(String owner,
}
/**
* Generate an block token for specified user, blockId.
* Generate an block token for specified user, blockId. Service field for
* token is set to blockId.
*
* @param user
* @param blockId
@ -92,8 +93,10 @@ public Token<OzoneBlockTokenIdentifier> generateToken(String user,
LOG.trace("Issued delegation token -> expiryTime:{},tokenId:{}",
expiryTime, tokenId);
}
// Pass blockId as service.
return new Token<>(tokenIdentifier.getBytes(),
createPassword(tokenIdentifier), tokenIdentifier.getKind(), SERVICE);
createPassword(tokenIdentifier), tokenIdentifier.getKind(),
new Text(blockId));
}
/**

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.security.OzoneSecretStore.OzoneManagerSecretState;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier.TokenInfo;
@ -75,7 +76,8 @@ public class OzoneDelegationTokenSecretManager<T extends OzoneTokenIdentifier>
public OzoneDelegationTokenSecretManager(OzoneConfiguration conf,
long tokenMaxLifetime, long tokenRenewInterval,
long dtRemoverScanInterval, Text service) throws IOException {
super(conf, tokenMaxLifetime, tokenRenewInterval, service, LOG);
super(new SecurityConfig(conf), tokenMaxLifetime, tokenRenewInterval,
service, LOG);
currentTokens = new ConcurrentHashMap();
this.tokenRemoverScanInterval = dtRemoverScanInterval;
this.store = new OzoneSecretStore(conf);

View File

@ -20,7 +20,6 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
@ -66,16 +65,16 @@ public abstract class OzoneSecretManager<T extends TokenIdentifier>
/**
* Create a secret manager.
*
* @param conf configuration.
* @param secureConf configuration.
* @param tokenMaxLifetime the maximum lifetime of the delegation tokens in
* milliseconds
* @param tokenRenewInterval how often the tokens must be renewed in
* milliseconds
* @param service name of service
*/
public OzoneSecretManager(OzoneConfiguration conf, long tokenMaxLifetime,
public OzoneSecretManager(SecurityConfig secureConf, long tokenMaxLifetime,
long tokenRenewInterval, Text service, Logger logger) {
this.securityConfig = new SecurityConfig(conf);
this.securityConfig = secureConf;
this.tokenMaxLifetime = tokenMaxLifetime;
this.tokenRenewInterval = tokenRenewInterval;
currentKeyId = new AtomicInteger();

View File

@ -411,6 +411,7 @@ message KeyLocation {
required uint64 length = 4;
// indicated at which version this block gets created.
optional uint64 createVersion = 5;
optional hadoop.common.TokenProto token = 6;
}
message KeyLocationList {

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
@ -64,7 +65,7 @@ public void setUp() throws Exception {
x509Certificate = KeyStoreTestUtil
.generateCertificate("CN=OzoneMaster", keyPair, 30, "SHA256withRSA");
omCertSerialId = x509Certificate.getSerialNumber().toString();
secretManager = new OzoneBlockTokenSecretManager(conf,
secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
expiryTime, omCertSerialId);
secretManager.start(keyPair);
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.hdds.scm.protocolPB
@ -240,6 +241,7 @@ abstract class Builder {
protected int numOfScmHandlers = 20;
protected int numOfDatanodes = 1;
protected boolean startDataNodes = true;
protected CertificateClient certClient;
protected Builder(OzoneConfiguration conf) {
this.conf = conf;
@ -265,6 +267,18 @@ public Builder setStartDataNodes(boolean nodes) {
return this;
}
/**
* Sets the certificate client.
*
* @param client
*
* @return MiniOzoneCluster.Builder
*/
public Builder setCertificateClient(CertificateClient client) {
this.certClient = client;
return this;
}
/**
* Sets the SCM id.
*

View File

@ -378,6 +378,7 @@ public MiniOzoneCluster build() throws IOException {
scm = createSCM();
scm.start();
om = createOM();
om.setCertClient(certClient);
} catch (AuthenticationException ex) {
throw new IOException("Unable to build MiniOzoneCluster. ", ex);
}

View File

@ -89,7 +89,8 @@ public void testContainerStateMachineIdempotency() throws Exception {
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
try {
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID);
ContainerProtocolCalls.createContainer(client, containerID, traceID,
null);
// call create Container again
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =
@ -111,8 +112,10 @@ public void testContainerStateMachineIdempotency() throws Exception {
client.sendCommand(putKeyRequest);
// close container call
ContainerProtocolCalls.closeContainer(client, containerID, traceID);
ContainerProtocolCalls.closeContainer(client, containerID, traceID);
ContainerProtocolCalls.closeContainer(client, containerID, traceID,
null);
ContainerProtocolCalls.closeContainer(client, containerID, traceID,
null);
} catch (IOException ioe) {
Assert.fail("Container operation failed" + ioe);
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
import org.apache.hadoop.hdds.security.x509.certificates.utils.SelfSignedCertificate;
import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
import org.bouncycastle.cert.X509CertificateHolder;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import java.io.InputStream;
import java.security.KeyPair;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.cert.CertStore;
import java.security.cert.X509Certificate;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.List;
/**
* Test implementation for CertificateClient. To be used only for test
* purposes.
*/
public class CertificateClientTestImpl implements CertificateClient {
private final SecurityConfig securityConfig;
private final KeyPair keyPair;
private final X509Certificate cert;
public CertificateClientTestImpl(OzoneConfiguration conf) throws Exception{
securityConfig = new SecurityConfig(conf);
HDDSKeyGenerator keyGen =
new HDDSKeyGenerator(securityConfig.getConfiguration());
keyPair = keyGen.generateKey();
SelfSignedCertificate.Builder builder =
SelfSignedCertificate.newBuilder()
.setBeginDate(LocalDate.now())
.setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
.setClusterID("cluster1")
.setKey(keyPair)
.setSubject("TestCertSub")
.setConfiguration(conf)
.setScmID("TestScmId1")
.makeCA();
X509CertificateHolder certificateHolder = builder.build();
cert = new JcaX509CertificateConverter().getCertificate(certificateHolder);
}
@Override
public PrivateKey getPrivateKey(String component) {
return keyPair.getPrivate();
}
@Override
public PublicKey getPublicKey(String component) {
return keyPair.getPublic();
}
@Override
public X509Certificate getCertificate(String component) {
return cert;
}
@Override
public boolean verifyCertificate(X509Certificate certificate) {
return true;
}
@Override
public byte[] signDataStream(InputStream stream, String component)
throws CertificateException {
return new byte[0];
}
@Override
public boolean verifySignature(InputStream stream, byte[] signature,
X509Certificate x509Certificate) {
return true;
}
@Override
public boolean verifySignature(byte[] data, byte[] signature,
X509Certificate x509Certificate) {
return true;
}
@Override
public CertificateSignRequest.Builder getCSRBuilder() {
return null;
}
@Override
public X509Certificate queryCertificate(String query) {
return null;
}
@Override
public void storePrivateKey(PrivateKey key, String component)
throws CertificateException {
}
@Override
public void storePublicKey(PublicKey key, String component)
throws CertificateException {
}
@Override
public void storeCertificate(X509Certificate certificate, String component)
throws CertificateException {
}
@Override
public void storeTrustChain(CertStore certStore, String component)
throws CertificateException {
}
@Override
public void storeTrustChain(List<X509Certificate> certificates,
String component) throws CertificateException {
}
}

View File

@ -25,6 +25,7 @@
import java.io.IOException;
/**
* This class is to test all the public facing APIs of Ozone Client.
*/

View File

@ -109,7 +109,7 @@ public abstract class TestOzoneRpcClientAbstract {
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static final String SCM_ID = UUID.randomUUID().toString();
private static String SCM_ID = UUID.randomUUID().toString();
/**
* Create a MiniOzoneCluster for testing.
@ -146,6 +146,33 @@ static void shutdownCluster() throws IOException {
}
}
public static void setCluster(MiniOzoneCluster cluster) {
TestOzoneRpcClientAbstract.cluster = cluster;
}
public static void setOzClient(OzoneClient ozClient) {
TestOzoneRpcClientAbstract.ozClient = ozClient;
}
public static void setOzoneManager(OzoneManager ozoneManager){
TestOzoneRpcClientAbstract.ozoneManager = ozoneManager;
}
public static void setStorageContainerLocationClient(
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient) {
TestOzoneRpcClientAbstract.storageContainerLocationClient =
storageContainerLocationClient;
}
public static void setStore(ObjectStore store) {
TestOzoneRpcClientAbstract.store = store;
}
public static void setScmId(String scmId){
TestOzoneRpcClientAbstract.SCM_ID = scmId;
}
@Test
public void testSetVolumeQuota()
throws IOException, OzoneException {

View File

@ -0,0 +1,238 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
/**
* This class is to test all the public facing APIs of Ozone Client.
*/
public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
private static MiniOzoneCluster cluster = null;
private static OzoneClient ozClient = null;
private static ObjectStore store = null;
private static OzoneManager ozoneManager;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static final String SCM_ID = UUID.randomUUID().toString();
private static File testDir;
private static OzoneConfiguration conf;
/**
* Create a MiniOzoneCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
testDir = GenericTestUtils.getTestDir(
TestSecureOzoneRpcClient.class.getSimpleName());
OzoneManager.setTestSecureOmFlag(true);
conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
conf.setBoolean(HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED, true);
conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
CertificateClientTestImpl certificateClientTest =
new CertificateClientTestImpl(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10)
.setScmId(SCM_ID)
.setCertificateClient(certificateClientTest)
.build();
cluster.getOzoneManager().startSecretManager();
cluster.waitForClusterToBeReady();
ozClient = OzoneClientFactory.getRpcClient(conf);
store = ozClient.getObjectStore();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
ozoneManager = cluster.getOzoneManager();
TestOzoneRpcClient.setCluster(cluster);
TestOzoneRpcClient.setOzClient(ozClient);
TestOzoneRpcClient.setOzoneManager(ozoneManager);
TestOzoneRpcClient.setStorageContainerLocationClient(
storageContainerLocationClient);
TestOzoneRpcClient.setStore(store);
TestOzoneRpcClient.setScmId(SCM_ID);
}
/**
* Tests successful completion of following operations when grpc block
* token is used.
* 1. getKey
* 2. writeChunk
* */
@Test
public void testPutKeySuccessWithBlockToken() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
long currentTime = Time.now();
String value = "sample value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
for (int i = 0; i < 10; i++) {
String keyName = UUID.randomUUID().toString();
try (OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE)) {
out.write(value.getBytes());
}
OzoneKey key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
byte[] fileContent;
try(OzoneInputStream is = bucket.readKey(keyName)) {
fileContent = new byte[value.getBytes().length];
is.read(fileContent);
}
Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
keyName, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE));
Assert.assertEquals(value, new String(fileContent));
Assert.assertTrue(key.getCreationTime() >= currentTime);
Assert.assertTrue(key.getModificationTime() >= currentTime);
}
}
/**
* Tests failure in following operations when grpc block token is
* not present.
* 1. getKey
* 2. writeChunk
* */
@Test
public void testKeyOpFailureWithoutBlockToken() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String value = "sample value";
BlockTokenVerifier.setTestStub(true);
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
for (int i = 0; i < 10; i++) {
String keyName = UUID.randomUUID().toString();
try(OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE)) {
LambdaTestUtils.intercept(IOException.class, "UNAUTHENTICATED: Fail " +
"to find any token ",
() -> out.write(value.getBytes()));
}
OzoneKey key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
LambdaTestUtils.intercept(IOException.class, "Failed to authenticate" +
" with GRPC XceiverServer with Ozone block token.",
() -> bucket.readKey(keyName));
}
BlockTokenVerifier.setTestStub(false);
}
private boolean verifyRatisReplication(String volumeName, String bucketName,
String keyName, ReplicationType type, ReplicationFactor factor)
throws IOException {
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.build();
HddsProtos.ReplicationType replicationType =
HddsProtos.ReplicationType.valueOf(type.toString());
HddsProtos.ReplicationFactor replicationFactor =
HddsProtos.ReplicationFactor.valueOf(factor.getValue());
OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
for (OmKeyLocationInfo info:
keyInfo.getLatestVersionLocations().getLocationList()) {
ContainerInfo container =
storageContainerLocationClient.getContainer(info.getContainerID());
if (!container.getReplicationFactor().equals(replicationFactor) || (
container.getReplicationType() != replicationType)) {
return false;
}
}
return true;
}
/**
* Close OzoneClient and shutdown MiniOzoneCluster.
*/
@AfterClass
public static void shutdown() throws IOException {
if(ozClient != null) {
ozClient.close();
}
if (storageContainerLocationClient != null) {
storageContainerLocationClient.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@ -228,7 +229,8 @@ public void testBothGetandPutSmallFile() throws Exception {
XceiverClientGrpc client = null;
try {
OzoneConfiguration conf = newOzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
tempFolder.getRoot().getPath());
client = createClientForTesting(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
.setRandomContainerPort(false)
@ -286,7 +288,8 @@ public void testCloseContainer() throws Exception {
try {
OzoneConfiguration conf = newOzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
tempFolder.getRoot().getPath());
client = createClientForTesting(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
.setRandomContainerPort(false)
@ -383,7 +386,8 @@ public void testDeleteContainer() throws Exception {
writeChunkRequest, putBlockRequest;
try {
OzoneConfiguration conf = newOzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
tempFolder.getRoot().getPath());
client = createClientForTesting(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
.setRandomContainerPort(false)
@ -501,7 +505,8 @@ public void testXcieverClientAsync() throws Exception {
XceiverClientGrpc client = null;
try {
OzoneConfiguration conf = newOzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
tempFolder.getRoot().getPath());
client = createClientForTesting(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
.setRandomContainerPort(false)

View File

@ -172,7 +172,7 @@ public void testCreateOzoneContainer() throws Exception {
public Void run() {
try {
XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
client.connect();
client.connect(token.encodeToUrlString());
createContainerForTesting(client, containerID);
} catch (Exception e) {
if (requireBlockToken && hasBlockToken && !blockTokeExpired) {

View File

@ -88,7 +88,7 @@ public void testAllocateWrite() throws Exception {
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID);
container.getContainerInfo().getContainerID(), traceID, null);
BlockID blockID = ContainerTestHelper.getTestBlockID(
container.getContainerInfo().getContainerID());
@ -111,7 +111,7 @@ public void testInvalidBlockRead() throws Exception {
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID);
container.getContainerInfo().getContainerID(), traceID, null);
thrown.expect(StorageContainerException.class);
thrown.expectMessage("Unable to find the block");
@ -135,7 +135,7 @@ public void testInvalidContainerRead() throws Exception {
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID);
container.getContainerInfo().getContainerID(), traceID, null);
BlockID blockID = ContainerTestHelper.getTestBlockID(
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.writeSmallFile(client, blockID,
@ -162,7 +162,7 @@ public void testReadWriteWithBCSId() throws Exception {
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID);
container.getContainerInfo().getContainerID(), traceID, null);
BlockID blockID1 = ContainerTestHelper.getTestBlockID(
container.getContainerInfo().getContainerID());

View File

@ -93,7 +93,7 @@ public void tesGetCommittedBlockLength() throws Exception {
Pipeline pipeline = container.getPipeline();
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID);
ContainerProtocolCalls.createContainer(client, containerID, traceID, null);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =
@ -126,11 +126,11 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception {
long containerID = container.getContainerInfo().getContainerID();
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client, containerID, traceID);
ContainerProtocolCalls.createContainer(client, containerID, traceID, null);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
// move the container to closed state
ContainerProtocolCalls.closeContainer(client, containerID, traceID);
ContainerProtocolCalls.closeContainer(client, containerID, traceID, null);
try {
// There is no block written inside the container. The request should
// fail.
@ -153,7 +153,7 @@ public void tesPutKeyResposne() throws Exception {
Pipeline pipeline = container.getPipeline();
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID);
ContainerProtocolCalls.createContainer(client, containerID, traceID, null);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =

View File

@ -146,7 +146,7 @@ public void testFreeByReference() throws IOException {
// However container call should succeed because of refcount on the client.
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), traceID1);
container1.getContainerInfo().getContainerID(), traceID1, null);
// After releasing the client, this connection should be closed
// and any container operations should fail
@ -155,7 +155,7 @@ public void testFreeByReference() throws IOException {
String expectedMessage = "This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), traceID1);
container1.getContainerInfo().getContainerID(), traceID1, null);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {
@ -206,7 +206,7 @@ public void testFreeByEviction() throws IOException {
String expectedMessage = "This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), traceID2);
container1.getContainerInfo().getContainerID(), traceID2, null);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {

View File

@ -72,7 +72,9 @@ public void start(Object service) {
@Override
public void stop() {
try {
if (handler != null) {
handler.close();
}
} catch (Exception e) {
throw new RuntimeException("Can't stop the Object Store Rest server", e);
}

View File

@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
@ -36,6 +37,10 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@ -57,6 +62,9 @@
import org.apache.hadoop.utils.db.DBStore;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
@ -89,13 +97,14 @@ public class KeyManagerImpl implements KeyManager {
private final long preallocateMax;
private final String omId;
private final OzoneBlockTokenSecretManager secretManager;
private final boolean grpcBlockTokenEnabled;
private BackgroundService keyDeletingService;
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OMMetadataManager metadataManager,
OzoneConfiguration conf,
String omId) {
OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
OzoneBlockTokenSecretManager secretManager) {
this.scmBlockClient = scmBlockClient;
this.metadataManager = metadataManager;
this.scmBlockSize = (long) conf
@ -108,6 +117,10 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
this.omId = omId;
start(conf);
this.secretManager = secretManager;
this.grpcBlockTokenEnabled = conf.getBoolean(
HDDS_GRPC_BLOCK_TOKEN_ENABLED,
HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT);
}
@Override
@ -185,11 +198,18 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
}
throw ex;
}
OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setLength(scmBlockSize)
.setOffset(0)
.build();
.setOffset(0);
if (grpcBlockTokenEnabled) {
String remoteUser = getRemoteUser().getShortUserName();
builder.setToken(secretManager.generateToken(remoteUser,
allocatedBlock.getBlockID().toString(),
getAclForUser(remoteUser),
scmBlockSize));
}
OmKeyLocationInfo info = builder.build();
// current version not committed, so new blocks coming now are added to
// the same version
keyInfo.appendNewBlocks(Collections.singletonList(info));
@ -199,6 +219,24 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
return info;
}
/* Optimize ugi lookup for RPC operations to avoid a trip through
* UGI.getCurrentUser which is synch'ed.
*/
public static UserGroupInformation getRemoteUser() throws IOException {
UserGroupInformation ugi = Server.getRemoteUser();
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
/**
* Return acl for user.
* @param user
*
* */
private EnumSet<AccessModeProto> getAclForUser(String user) {
// TODO: Return correct acl for user.
return EnumSet.allOf(AccessModeProto.class);
}
@Override
public OpenKeySession openKey(OmKeyArgs args) throws IOException {
Preconditions.checkNotNull(args);
@ -268,11 +306,19 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
}
throw ex;
}
OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder()
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setLength(allocateSize)
.setOffset(0)
.build();
.setOffset(0);
if (grpcBlockTokenEnabled) {
String remoteUser = getRemoteUser().getShortUserName();
builder.setToken(secretManager.generateToken(remoteUser,
allocatedBlock.getBlockID().toString(),
getAclForUser(remoteUser),
scmBlockSize));
}
OmKeyLocationInfo subKeyInfo = builder.build();
locations.add(subKeyInfo);
requestedSize -= allocateSize;
}
@ -422,6 +468,17 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
throw new OMException("Key not found",
OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
}
if (grpcBlockTokenEnabled) {
String remoteUser = getRemoteUser().getShortUserName();
for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) {
key.getLocationList().forEach(k -> {
k.setToken(secretManager.generateToken(remoteUser,
k.getBlockID().getContainerBlockID().toString(),
getAclForUser(remoteUser),
k.getLength()));
});
}
}
return value;
} catch (IOException ex) {
LOG.debug("Get key failed for volume:{} bucket:{} key:{}",

View File

@ -24,8 +24,10 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import java.security.KeyPair;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -37,6 +39,7 @@
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdfs.DFSUtil;
@ -95,6 +98,7 @@
import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
import org.apache.hadoop.ozone.security.acl.RequestContext;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -180,10 +184,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private static final String OM_DAEMON = "om";
private static boolean securityEnabled = false;
private static OzoneDelegationTokenSecretManager<OzoneTokenIdentifier>
secretManager;
delegationTokenMgr;
private OzoneBlockTokenSecretManager blockTokenMgr;
private KeyPair keyPair;
private CertificateClient certClient;
private static boolean testSecureOmFlag = false;
private final Text omRpcAddressTxt;
private final OzoneConfiguration configuration;
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
@ -213,6 +219,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final boolean isAclEnabled;
private final IAccessAuthorizer accessAuthorizer;
private JvmPauseMonitor jvmPauseMonitor;
private final SecurityConfig secConfig;
private OzoneManager(OzoneConfiguration conf) throws IOException {
Preconditions.checkNotNull(conf);
@ -224,7 +231,7 @@ private OzoneManager(OzoneConfiguration conf) throws IOException {
ResultCodes.OM_NOT_INITIALIZED);
}
if (!testSecureOmFlag) {
if (!testSecureOmFlag || !isOzoneSecurityEnabled()) {
scmContainerClient = getScmContainerClient(configuration);
// verifies that the SCM info in the OM Version file is correct.
scmBlockClient = getScmBlockClient(configuration);
@ -271,7 +278,14 @@ private OzoneManager(OzoneConfiguration conf) throws IOException {
BlockingService omService = newReflectiveBlockingService(
new OzoneManagerProtocolServerSideTranslatorPB(
this, omRatisClient, omRatisEnabled));
secretManager = createSecretManager(configuration);
omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration));
secConfig = new SecurityConfig(configuration);
if (secConfig.isGrpcBlockTokenEnabled()) {
blockTokenMgr = createBlockTokenSecretManager(configuration);
}
if(secConfig.isSecurityEnabled()){
delegationTokenMgr = createDelegationTokenSecretManager(configuration);
}
omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
OzoneManagerProtocolPB.class, omService,
@ -285,9 +299,8 @@ private OzoneManager(OzoneConfiguration conf) throws IOException {
s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager,
volumeManager, bucketManager);
keyManager =
new KeyManagerImpl(scmBlockClient, metadataManager, configuration,
omStorage.getOmId());
keyManager = new KeyManagerImpl(scmBlockClient, metadataManager,
configuration, omStorage.getOmId(), blockTokenMgr);
shutdownHook = () -> {
saveOmMetrics();
@ -368,7 +381,7 @@ private File getMetricsStorageFile() {
}
private OzoneDelegationTokenSecretManager createSecretManager(
private OzoneDelegationTokenSecretManager createDelegationTokenSecretManager(
OzoneConfiguration conf) throws IOException {
long tokenRemoverScanInterval =
conf.getTimeDuration(OMConfigKeys.DELEGATION_REMOVER_SCAN_INTERVAL_KEY,
@ -387,30 +400,78 @@ private OzoneDelegationTokenSecretManager createSecretManager(
tokenRenewInterval, tokenRemoverScanInterval, omRpcAddressTxt);
}
private void stopSecretManager() throws IOException {
if (secretManager != null) {
LOG.info("Stopping OM secret manager");
secretManager.stop();
private OzoneBlockTokenSecretManager createBlockTokenSecretManager(
OzoneConfiguration conf) {
long expiryTime = conf.getTimeDuration(
HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME,
HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT,
TimeUnit.MILLISECONDS);
// TODO: Pass OM cert serial ID.
if (testSecureOmFlag) {
return new OzoneBlockTokenSecretManager(secConfig, expiryTime, "1");
}
Objects.nonNull(certClient);
return new OzoneBlockTokenSecretManager(secConfig, expiryTime,
certClient.getCertificate(OM_DAEMON).getSerialNumber().toString());
}
private void stopSecretManager() {
if (blockTokenMgr != null) {
LOG.info("Stopping OM block token manager.");
try {
blockTokenMgr.stop();
} catch (IOException e) {
LOG.error("Failed to stop block token manager", e);
}
}
private void startSecretManager() {
if (secretManager != null) {
if (delegationTokenMgr != null) {
LOG.info("Stopping OM delegation token secret manager.");
try {
delegationTokenMgr.stop();
} catch (IOException e) {
LOG.error("Failed to stop delegation token manager", e);
}
}
}
@VisibleForTesting
public void startSecretManager() {
try {
readKeyPair();
LOG.info("Starting OM secret manager");
secretManager.start(keyPair);
} catch (OzoneSecurityException e) {
LOG.error("Unable to read key pair for OM.", e);
throw new RuntimeException(e);
}
if (secConfig.isGrpcBlockTokenEnabled() && blockTokenMgr != null) {
try {
LOG.info("Starting OM block token secret manager");
blockTokenMgr.start(keyPair);
} catch (IOException e) {
// Inability to start secret manager
// can't be recovered from.
LOG.error("Error starting secret manager.", e);
// Unable to start secret manager.
LOG.error("Error starting block token secret manager.", e);
throw new RuntimeException(e);
}
}
if (delegationTokenMgr != null) {
try {
LOG.info("Starting OM delegation token secret manager");
delegationTokenMgr.start(keyPair);
} catch (IOException e) {
// Unable to start secret manager.
LOG.error("Error starting delegation token secret manager.", e);
throw new RuntimeException(e);
}
}
}
/**
* For testing purpose only.
* */
public void setCertClient(CertificateClient certClient) {
// TODO: Initialize it in contructor with implementation for certClient.
// TODO: Initialize it in constructor with implementation for certClient.
this.certClient = certClient;
}
@ -524,7 +585,7 @@ private static RPC.Server startRpcServer(OzoneConfiguration conf,
.setPort(addr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(secretManager)
.setSecretManager(delegationTokenMgr)
.build();
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
@ -844,12 +905,15 @@ public void join() {
}
private void startSecretManagerIfNecessary() {
boolean shouldRun = shouldUseDelegationTokens() && isOzoneSecurityEnabled();
boolean running = secretManager.isRunning();
if (shouldRun && !running) {
boolean shouldRun = isOzoneSecurityEnabled();
if (shouldRun) {
boolean running = delegationTokenMgr.isRunning()
&& blockTokenMgr.isRunning();
if(!running){
startSecretManager();
}
}
}
private boolean shouldUseDelegationTokens() {
return UserGroupInformation.isSecurityEnabled();
@ -910,7 +974,7 @@ public Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
throw new IOException("Delegation Token can be issued only with "
+ "kerberos or web authentication");
}
if (secretManager == null || !secretManager.isRunning()) {
if (delegationTokenMgr == null || !delegationTokenMgr.isRunning()) {
LOG.warn("trying to get DT with no secret manager running in OM.");
return null;
}
@ -923,7 +987,7 @@ public Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
realUser = new Text(ugi.getRealUser().getUserName());
}
token = secretManager.createToken(owner, renewer, realUser);
token = delegationTokenMgr.createToken(owner, renewer, realUser);
return token;
}
@ -946,7 +1010,7 @@ public long renewDelegationToken(Token<OzoneTokenIdentifier> token)
+ "kerberos or web authentication");
}
String renewer = getRemoteUser().getShortUserName();
expiryTime = secretManager.renewToken(token, renewer);
expiryTime = delegationTokenMgr.renewToken(token, renewer);
} catch (AccessControlException ace) {
final OzoneTokenIdentifier id = OzoneTokenIdentifier.readProtoBuf(
@ -969,7 +1033,7 @@ public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
OzoneTokenIdentifier id = null;
try {
String canceller = getRemoteUser().getUserName();
id = secretManager.cancelToken(token, canceller);
id = delegationTokenMgr.cancelToken(token, canceller);
LOG.trace("Delegation token renewed for dt: {}", id);
} catch (AccessControlException ace) {
LOG.error("Delegation token renewal failed for dt: {}, cause: {}", id,

View File

@ -90,7 +90,7 @@ public void checkIfDeleteServiceisDeletingKeys()
KeyManager keyManager =
new KeyManagerImpl(
new ScmBlockLocationTestIngClient(null, null, 0),
metaMgr, conf, UUID.randomUUID().toString());
metaMgr, conf, UUID.randomUUID().toString(), null);
final int keyCount = 100;
createAndDeleteKeys(keyManager, keyCount, 1);
KeyDeletingService keyDeletingService =
@ -112,7 +112,7 @@ public void checkIfDeleteServiceWithFailingSCM()
KeyManager keyManager =
new KeyManagerImpl(
new ScmBlockLocationTestIngClient(null, null, 1),
metaMgr, conf, UUID.randomUUID().toString());
metaMgr, conf, UUID.randomUUID().toString(), null);
final int keyCount = 100;
createAndDeleteKeys(keyManager, keyCount, 1);
KeyDeletingService keyDeletingService =
@ -139,7 +139,7 @@ public void checkDeletionForEmptyKey()
KeyManager keyManager =
new KeyManagerImpl(
new ScmBlockLocationTestIngClient(null, null, 1),
metaMgr, conf, UUID.randomUUID().toString());
metaMgr, conf, UUID.randomUUID().toString(), null);
final int keyCount = 100;
createAndDeleteKeys(keyManager, keyCount, 0);
KeyDeletingService keyDeletingService =

View File

@ -84,7 +84,7 @@ public void setUp() throws Exception {
scmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
metadataManager = Mockito.mock(OMMetadataManager.class);
keyManager = new KeyManagerImpl(scmBlockLocationProtocol, metadataManager,
conf, "om1");
conf, "om1", null);
setupMocks();
}