HDDS-1138. Ozone Client should avoid talking to SCM directly. Contributed by Xiaoyu Yao and Mukul Kumar Singh.

Closes #585
This commit is contained in:
Xiaoyu Yao 2019-03-15 10:41:06 -07:00 committed by GitHub
parent dc21655f2a
commit d1afa03804
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 154 additions and 38 deletions

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@ -277,9 +276,7 @@ public static LengthInputStream getFromOmKeyInfo(
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
BlockID blockID = omKeyLocationInfo.getBlockID();
long containerID = blockID.getContainerID();
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.getContainerWithPipeline(containerID);
Pipeline pipeline = containerWithPipeline.getPipeline();
Pipeline pipeline = omKeyLocationInfo.getPipeline();
// irrespective of the container state, we will always read via Standalone
// protocol.

View File

@ -26,8 +26,6 @@
.ChecksumType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@ -73,7 +71,6 @@ public class KeyOutputStream extends OutputStream {
private final ArrayList<BlockOutputStreamEntry> streamEntries;
private int currentStreamIndex;
private final OzoneManagerProtocol omClient;
private final StorageContainerLocationProtocol scmClient;
private final OmKeyArgs keyArgs;
private final long openID;
private final XceiverClientManager xceiverClientManager;
@ -100,7 +97,6 @@ public class KeyOutputStream extends OutputStream {
public KeyOutputStream() {
streamEntries = new ArrayList<>();
omClient = null;
scmClient = null;
keyArgs = null;
openID = -1;
xceiverClientManager = null;
@ -136,6 +132,7 @@ public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
.setLength(streamEntry.getCurrentPosition()).setOffset(0)
.setToken(streamEntry.getToken())
.setPipeline(streamEntry.getPipeline())
.build();
LOG.debug("block written " + streamEntry.getBlockID() + ", length "
+ streamEntry.getCurrentPosition() + " bcsID "
@ -149,7 +146,6 @@ public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
@SuppressWarnings("parameternumber")
public KeyOutputStream(OpenKeySession handler,
XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocol scmClient,
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
@ -158,7 +154,6 @@ public KeyOutputStream(OpenKeySession handler,
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.omClient = omClient;
this.scmClient = scmClient;
OmKeyInfo info = handler.getKeyInfo();
// Retrieve the file encryption key info, null if file is not in
// encrypted bucket.
@ -221,15 +216,14 @@ public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
throws IOException {
ContainerWithPipeline containerWithPipeline = scmClient
.getContainerWithPipeline(subKeyInfo.getContainerID());
Preconditions.checkNotNull(subKeyInfo.getPipeline());
UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
BlockOutputStreamEntry.Builder builder =
new BlockOutputStreamEntry.Builder()
.setBlockID(subKeyInfo.getBlockID())
.setKey(keyArgs.getKeyName())
.setXceiverClientManager(xceiverClientManager)
.setPipeline(containerWithPipeline.getPipeline())
.setPipeline(subKeyInfo.getPipeline())
.setRequestId(requestID)
.setChunkSize(chunkSize)
.setLength(subKeyInfo.getLength())
@ -637,7 +631,6 @@ public ExcludeList getExcludeList() {
public static class Builder {
private OpenKeySession openHandler;
private XceiverClientManager xceiverManager;
private StorageContainerLocationProtocol scmClient;
private OzoneManagerProtocol omClient;
private int chunkSize;
private String requestID;
@ -675,11 +668,6 @@ public Builder setXceiverClientManager(XceiverClientManager manager) {
return this;
}
public Builder setScmClient(StorageContainerLocationProtocol client) {
this.scmClient = client;
return this;
}
public Builder setOmClient(
OzoneManagerProtocol client) {
this.omClient = client;
@ -747,7 +735,7 @@ public Builder setMaxRetryCount(int maxCount) {
}
public KeyOutputStream build() throws IOException {
return new KeyOutputStream(openHandler, xceiverManager, scmClient,
return new KeyOutputStream(openHandler, xceiverManager,
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
streamBufferMaxSize, blockSize, watchTimeout, checksumType,
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,

View File

@ -602,7 +602,6 @@ public OzoneOutputStream createKey(
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(requestId)
@ -865,7 +864,6 @@ public OzoneOutputStream createMultipartKey(String volumeName,
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(requestId)

View File

@ -44,13 +44,14 @@ public final class OmKeyArgs implements Auditable {
private final String multipartUploadID;
private final int multipartUploadPartNumber;
private Map<String, String> metadata;
private boolean refreshPipeline;
@SuppressWarnings("parameternumber")
private OmKeyArgs(String volumeName, String bucketName, String keyName,
long dataSize, ReplicationType type, ReplicationFactor factor,
List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
String uploadID, int partNumber,
Map<String, String> metadataMap) {
Map<String, String> metadataMap, boolean refreshPipeline) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
@ -62,6 +63,7 @@ private OmKeyArgs(String volumeName, String bucketName, String keyName,
this.multipartUploadID = uploadID;
this.multipartUploadPartNumber = partNumber;
this.metadata = metadataMap;
this.refreshPipeline = refreshPipeline;
}
public boolean getIsMultipartKey() {
@ -120,6 +122,10 @@ public List<OmKeyLocationInfo> getLocationInfoList() {
return locationInfoList;
}
public boolean getRefreshPipeline() {
return refreshPipeline;
}
@Override
public Map<String, String> toAuditMap() {
Map<String, String> auditMap = new LinkedHashMap<>();
@ -159,6 +165,7 @@ public static class Builder {
private String multipartUploadID;
private int multipartUploadPartNumber;
private Map<String, String> metadata = new HashMap<>();
private boolean refreshPipeline;
public Builder setVolumeName(String volume) {
this.volumeName = volume;
@ -220,10 +227,15 @@ public Builder addAllMetadata(Map<String, String> metadatamap) {
return this;
}
public Builder setRefreshPipeline(boolean refresh) {
this.refreshPipeline = refresh;
return this;
}
public OmKeyArgs build() {
return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
factor, locationInfoList, isMultipartKey, multipartUploadID,
multipartUploadPartNumber, metadata);
multipartUploadPartNumber, metadata, refreshPipeline);
}
}

View File

@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
import org.apache.hadoop.security.token.Token;
@ -35,15 +37,20 @@ public final class OmKeyLocationInfo {
// the version number indicating when this block was added
private long createVersion;
private OmKeyLocationInfo(BlockID blockID, long length, long offset) {
private Pipeline pipeline;
private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
long offset) {
this.blockID = blockID;
this.pipeline = pipeline;
this.length = length;
this.offset = offset;
}
private OmKeyLocationInfo(BlockID blockID, long length, long offset,
Token<OzoneBlockTokenIdentifier> token) {
private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
long offset, Token<OzoneBlockTokenIdentifier> token) {
this.blockID = blockID;
this.pipeline = pipeline;
this.length = length;
this.offset = offset;
this.token = token;
@ -69,6 +76,10 @@ public long getLocalID() {
return blockID.getLocalID();
}
public Pipeline getPipeline() {
return pipeline;
}
public long getLength() {
return length;
}
@ -92,6 +103,11 @@ public Token<OzoneBlockTokenIdentifier> getToken() {
public void setToken(Token<OzoneBlockTokenIdentifier> token) {
this.token = token;
}
public void setPipeline(Pipeline pipeline) {
this.pipeline = pipeline;
}
/**
* Builder of OmKeyLocationInfo.
*/
@ -100,12 +116,18 @@ public static class Builder {
private long length;
private long offset;
private Token<OzoneBlockTokenIdentifier> token;
private Pipeline pipeline;
public Builder setBlockID(BlockID blockId) {
this.blockID = blockId;
return this;
}
public Builder setPipeline(Pipeline pipeline) {
this.pipeline = pipeline;
return this;
}
public Builder setLength(long len) {
this.length = len;
return this;
@ -123,9 +145,9 @@ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
public OmKeyLocationInfo build() {
if (token == null) {
return new OmKeyLocationInfo(blockID, length, offset);
return new OmKeyLocationInfo(blockID, pipeline, length, offset);
} else {
return new OmKeyLocationInfo(blockID, length, offset, token);
return new OmKeyLocationInfo(blockID, pipeline, length, offset, token);
}
}
}
@ -139,12 +161,27 @@ public KeyLocation getProtobuf() {
if (this.token != null) {
builder.setToken(this.token.toTokenProto());
}
try {
builder.setPipeline(pipeline.getProtobufMessage());
} catch (UnknownPipelineStateException e) {
//TODO: fix me: we should not return KeyLocation without pipeline.
}
return builder.build();
}
private static Pipeline getPipeline(KeyLocation keyLocation) {
try {
return keyLocation.hasPipeline() ?
Pipeline.getFromProtobuf(keyLocation.getPipeline()) : null;
} catch (UnknownPipelineStateException e) {
return null;
}
}
public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
OmKeyLocationInfo info = new OmKeyLocationInfo(
BlockID.getFromProtobuf(keyLocation.getBlockID()),
getPipeline(keyLocation),
keyLocation.getLength(),
keyLocation.getOffset());
if(keyLocation.hasToken()) {
@ -161,6 +198,7 @@ public String toString() {
", length=" + length +
", offset=" + offset +
", token=" + token +
", pipeline=" + pipeline +
", createVersion=" + createVersion + '}';
}
}

View File

@ -495,6 +495,13 @@ message KeyLocation {
// indicated at which version this block gets created.
optional uint64 createVersion = 5;
optional hadoop.common.TokenProto token = 6;
// Walk around to include pipeline info for client read/write
// without talking to scm.
// NOTE: the pipeline info may change after pipeline close.
// So eventually, we will have to change back to call scm to
// get the up to date pipeline information. This will need o3fs
// provide not only a OM delegation token but also a SCM delegation token
optional hadoop.hdds.Pipeline pipeline = 7;
}
message KeyLocationList {

View File

@ -446,7 +446,6 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException,
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(args.getRequestID())

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@ -95,6 +96,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HEAD;
/**
* Implementation of keyManager.
*/
@ -105,7 +108,7 @@ public class KeyManagerImpl implements KeyManager {
/**
* A SCM block client, used to talk to SCM to allocate block during putKey.
*/
private final ScmBlockLocationProtocol scmBlockClient;
private final ScmClient scmClient;
private final OMMetadataManager metadataManager;
private final long scmBlockSize;
private final boolean useRatis;
@ -122,14 +125,15 @@ public class KeyManagerImpl implements KeyManager {
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
OzoneBlockTokenSecretManager secretManager) {
this(scmBlockClient, metadataManager, conf, omId, secretManager, null);
this(new ScmClient(scmBlockClient, null), metadataManager,
conf, omId, secretManager, null);
}
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
public KeyManagerImpl(ScmClient scmClient,
OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
OzoneBlockTokenSecretManager secretManager,
KeyProviderCryptoExtension kmsProvider) {
this.scmBlockClient = scmBlockClient;
this.scmClient = scmClient;
this.metadataManager = metadataManager;
this.scmBlockSize = (long) conf
.getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
@ -159,7 +163,7 @@ public void start(OzoneConfiguration configuration) {
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
keyDeletingService = new KeyDeletingService(scmBlockClient, this,
keyDeletingService = new KeyDeletingService(scmClient.getBlockClient(), this,
blockDeleteInterval, serviceTimeout, configuration);
keyDeletingService.start();
}
@ -269,7 +273,7 @@ private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo,
String remoteUser = getRemoteUser().getShortUserName();
List<AllocatedBlock> allocatedBlocks;
try {
allocatedBlocks = scmBlockClient
allocatedBlocks = scmClient.getBlockClient()
.allocateBlock(scmBlockSize, numBlocks, keyInfo.getType(),
keyInfo.getFactor(), omId, excludeList);
} catch (SCMException ex) {
@ -283,7 +287,8 @@ private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo,
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setLength(scmBlockSize)
.setOffset(0);
.setOffset(0)
.setPipeline(allocatedBlock.getPipeline());
if (grpcBlockTokenEnabled) {
builder.setToken(secretManager
.generateToken(remoteUser, allocatedBlock.getBlockID().toString(),
@ -575,6 +580,33 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
});
}
}
// Refresh container pipeline info from SCM
// based on OmKeyArgs.refreshPipeline flag
// 1. Client send initial read request OmKeyArgs.refreshPipeline = false
// and uses the pipeline cached in OM to access datanode
// 2. If succeeded, done.
// 3. If failed due to pipeline does not exist or invalid pipeline state
// exception, client should retry lookupKey with
// OmKeyArgs.refreshPipeline = true
if (args.getRefreshPipeline()) {
for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) {
key.getLocationList().forEach(k -> {
// TODO: fix Some tests that may not initialize container client
// The production should always have containerClient initialized.
if (scmClient.getContainerClient() != null) {
try {
ContainerWithPipeline cp = scmClient.getContainerClient()
.getContainerWithPipeline(k.getContainerID());
if (!cp.getPipeline().equals(k.getPipeline())) {
k.setPipeline(cp.getPipeline());
}
} catch (IOException e) {
LOG.debug("Unable to update pipeline for container");
}
}
});
}
}
return value;
} catch (IOException ex) {
LOG.debug("Get key failed for volume:{} bucket:{} key:{}",

View File

@ -334,7 +334,8 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
omRpcServer = getRpcServer(conf);
omRpcAddress = updateRPCListenAddress(configuration,
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
keyManager = new KeyManagerImpl(scmBlockClient, metadataManager,
keyManager = new KeyManagerImpl(
new ScmClient(scmBlockClient, scmContainerClient), metadataManager,
configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider());
shutdownHook = () -> {

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
/**
* Wrapper class for Scm protocol clients.
*/
public class ScmClient {
private ScmBlockLocationProtocol blockClient;
private StorageContainerLocationProtocol containerClient;
ScmClient(ScmBlockLocationProtocol blockClient,
StorageContainerLocationProtocol containerClient) {
this.containerClient = containerClient;
this.blockClient = blockClient;
}
ScmBlockLocationProtocol getBlockClient() {
return this.blockClient;
}
StorageContainerLocationProtocol getContainerClient() {
return this.containerClient;
}
}