diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index d8bfdbd127..3a92e0186e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -277,9 +276,7 @@ public static LengthInputStream getFromOmKeyInfo( OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i); BlockID blockID = omKeyLocationInfo.getBlockID(); long containerID = blockID.getContainerID(); - ContainerWithPipeline containerWithPipeline = - storageContainerLocationClient.getContainerWithPipeline(containerID); - Pipeline pipeline = containerWithPipeline.getPipeline(); + Pipeline pipeline = omKeyLocationInfo.getPipeline(); // irrespective of the container state, we will always read via Standalone // protocol. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index a379889e5b..3bd572dcef 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -26,8 +26,6 @@ .ChecksumType; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -73,7 +71,6 @@ public class KeyOutputStream extends OutputStream { private final ArrayList streamEntries; private int currentStreamIndex; private final OzoneManagerProtocol omClient; - private final StorageContainerLocationProtocol scmClient; private final OmKeyArgs keyArgs; private final long openID; private final XceiverClientManager xceiverClientManager; @@ -100,7 +97,6 @@ public class KeyOutputStream extends OutputStream { public KeyOutputStream() { streamEntries = new ArrayList<>(); omClient = null; - scmClient = null; keyArgs = null; openID = -1; xceiverClientManager = null; @@ -136,6 +132,7 @@ public List getLocationInfoList() throws IOException { new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) .setLength(streamEntry.getCurrentPosition()).setOffset(0) .setToken(streamEntry.getToken()) + .setPipeline(streamEntry.getPipeline()) .build(); LOG.debug("block written " + streamEntry.getBlockID() + ", length " + streamEntry.getCurrentPosition() + " bcsID " @@ -149,7 +146,6 @@ public List getLocationInfoList() throws IOException { @SuppressWarnings("parameternumber") public KeyOutputStream(OpenKeySession handler, XceiverClientManager xceiverClientManager, - StorageContainerLocationProtocol scmClient, OzoneManagerProtocol omClient, int chunkSize, String requestId, ReplicationFactor factor, ReplicationType type, long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout, @@ -158,7 +154,6 @@ public KeyOutputStream(OpenKeySession handler, this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.omClient = omClient; - this.scmClient = scmClient; OmKeyInfo info = handler.getKeyInfo(); // Retrieve the file encryption key info, null if file is not in // encrypted bucket. @@ -221,15 +216,14 @@ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) throws IOException { - ContainerWithPipeline containerWithPipeline = scmClient - .getContainerWithPipeline(subKeyInfo.getContainerID()); + Preconditions.checkNotNull(subKeyInfo.getPipeline()); UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); BlockOutputStreamEntry.Builder builder = new BlockOutputStreamEntry.Builder() .setBlockID(subKeyInfo.getBlockID()) .setKey(keyArgs.getKeyName()) .setXceiverClientManager(xceiverClientManager) - .setPipeline(containerWithPipeline.getPipeline()) + .setPipeline(subKeyInfo.getPipeline()) .setRequestId(requestID) .setChunkSize(chunkSize) .setLength(subKeyInfo.getLength()) @@ -637,7 +631,6 @@ public ExcludeList getExcludeList() { public static class Builder { private OpenKeySession openHandler; private XceiverClientManager xceiverManager; - private StorageContainerLocationProtocol scmClient; private OzoneManagerProtocol omClient; private int chunkSize; private String requestID; @@ -675,11 +668,6 @@ public Builder setXceiverClientManager(XceiverClientManager manager) { return this; } - public Builder setScmClient(StorageContainerLocationProtocol client) { - this.scmClient = client; - return this; - } - public Builder setOmClient( OzoneManagerProtocol client) { this.omClient = client; @@ -747,7 +735,7 @@ public Builder setMaxRetryCount(int maxCount) { } public KeyOutputStream build() throws IOException { - return new KeyOutputStream(openHandler, xceiverManager, scmClient, + return new KeyOutputStream(openHandler, xceiverManager, omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, streamBufferMaxSize, blockSize, watchTimeout, checksumType, bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index d0595822a5..83d9ec2d9c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -602,7 +602,6 @@ public OzoneOutputStream createKey( new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) - .setScmClient(storageContainerLocationClient) .setOmClient(ozoneManagerClient) .setChunkSize(chunkSize) .setRequestID(requestId) @@ -865,7 +864,6 @@ public OzoneOutputStream createMultipartKey(String volumeName, new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) - .setScmClient(storageContainerLocationClient) .setOmClient(ozoneManagerClient) .setChunkSize(chunkSize) .setRequestID(requestId) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java index 9620600b4e..d90345cc00 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java @@ -44,13 +44,14 @@ public final class OmKeyArgs implements Auditable { private final String multipartUploadID; private final int multipartUploadPartNumber; private Map metadata; + private boolean refreshPipeline; @SuppressWarnings("parameternumber") private OmKeyArgs(String volumeName, String bucketName, String keyName, long dataSize, ReplicationType type, ReplicationFactor factor, List locationInfoList, boolean isMultipart, String uploadID, int partNumber, - Map metadataMap) { + Map metadataMap, boolean refreshPipeline) { this.volumeName = volumeName; this.bucketName = bucketName; this.keyName = keyName; @@ -62,6 +63,7 @@ private OmKeyArgs(String volumeName, String bucketName, String keyName, this.multipartUploadID = uploadID; this.multipartUploadPartNumber = partNumber; this.metadata = metadataMap; + this.refreshPipeline = refreshPipeline; } public boolean getIsMultipartKey() { @@ -120,6 +122,10 @@ public List getLocationInfoList() { return locationInfoList; } + public boolean getRefreshPipeline() { + return refreshPipeline; + } + @Override public Map toAuditMap() { Map auditMap = new LinkedHashMap<>(); @@ -159,6 +165,7 @@ public static class Builder { private String multipartUploadID; private int multipartUploadPartNumber; private Map metadata = new HashMap<>(); + private boolean refreshPipeline; public Builder setVolumeName(String volume) { this.volumeName = volume; @@ -220,10 +227,15 @@ public Builder addAllMetadata(Map metadatamap) { return this; } + public Builder setRefreshPipeline(boolean refresh) { + this.refreshPipeline = refresh; + return this; + } + public OmKeyArgs build() { return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type, factor, locationInfoList, isMultipartKey, multipartUploadID, - multipartUploadPartNumber, metadata); + multipartUploadPartNumber, metadata, refreshPipeline); } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index e4eb0f5f0e..bbd1157a00 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.om.helpers; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation; import org.apache.hadoop.security.token.Token; @@ -35,15 +37,20 @@ public final class OmKeyLocationInfo { // the version number indicating when this block was added private long createVersion; - private OmKeyLocationInfo(BlockID blockID, long length, long offset) { + private Pipeline pipeline; + + private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length, + long offset) { this.blockID = blockID; + this.pipeline = pipeline; this.length = length; this.offset = offset; } - private OmKeyLocationInfo(BlockID blockID, long length, long offset, - Token token) { + private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length, + long offset, Token token) { this.blockID = blockID; + this.pipeline = pipeline; this.length = length; this.offset = offset; this.token = token; @@ -69,6 +76,10 @@ public long getLocalID() { return blockID.getLocalID(); } + public Pipeline getPipeline() { + return pipeline; + } + public long getLength() { return length; } @@ -92,6 +103,11 @@ public Token getToken() { public void setToken(Token token) { this.token = token; } + + public void setPipeline(Pipeline pipeline) { + this.pipeline = pipeline; + } + /** * Builder of OmKeyLocationInfo. */ @@ -100,12 +116,18 @@ public static class Builder { private long length; private long offset; private Token token; + private Pipeline pipeline; public Builder setBlockID(BlockID blockId) { this.blockID = blockId; return this; } + public Builder setPipeline(Pipeline pipeline) { + this.pipeline = pipeline; + return this; + } + public Builder setLength(long len) { this.length = len; return this; @@ -123,9 +145,9 @@ public Builder setToken(Token bToken) { public OmKeyLocationInfo build() { if (token == null) { - return new OmKeyLocationInfo(blockID, length, offset); + return new OmKeyLocationInfo(blockID, pipeline, length, offset); } else { - return new OmKeyLocationInfo(blockID, length, offset, token); + return new OmKeyLocationInfo(blockID, pipeline, length, offset, token); } } } @@ -139,12 +161,27 @@ public KeyLocation getProtobuf() { if (this.token != null) { builder.setToken(this.token.toTokenProto()); } + try { + builder.setPipeline(pipeline.getProtobufMessage()); + } catch (UnknownPipelineStateException e) { + //TODO: fix me: we should not return KeyLocation without pipeline. + } return builder.build(); } + private static Pipeline getPipeline(KeyLocation keyLocation) { + try { + return keyLocation.hasPipeline() ? + Pipeline.getFromProtobuf(keyLocation.getPipeline()) : null; + } catch (UnknownPipelineStateException e) { + return null; + } + } + public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) { OmKeyLocationInfo info = new OmKeyLocationInfo( BlockID.getFromProtobuf(keyLocation.getBlockID()), + getPipeline(keyLocation), keyLocation.getLength(), keyLocation.getOffset()); if(keyLocation.hasToken()) { @@ -161,6 +198,7 @@ public String toString() { ", length=" + length + ", offset=" + offset + ", token=" + token + + ", pipeline=" + pipeline + ", createVersion=" + createVersion + '}'; } } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 0e94aa63ba..2f9410bb1e 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -495,6 +495,13 @@ message KeyLocation { // indicated at which version this block gets created. optional uint64 createVersion = 5; optional hadoop.common.TokenProto token = 6; + // Walk around to include pipeline info for client read/write + // without talking to scm. + // NOTE: the pipeline info may change after pipeline close. + // So eventually, we will have to change back to call scm to + // get the up to date pipeline information. This will need o3fs + // provide not only a OM delegation token but also a SCM delegation token + optional hadoop.hdds.Pipeline pipeline = 7; } message KeyLocationList { diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 6fff3e4fd0..07016eb1af 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -446,7 +446,6 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException, new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) - .setScmClient(storageContainerLocationClient) .setOmClient(ozoneManagerClient) .setChunkSize(chunkSize) .setRequestID(args.getRequestID()) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index fbbb10490c..38d0922551 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; @@ -95,6 +96,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.HEAD; + /** * Implementation of keyManager. */ @@ -105,7 +108,7 @@ public class KeyManagerImpl implements KeyManager { /** * A SCM block client, used to talk to SCM to allocate block during putKey. */ - private final ScmBlockLocationProtocol scmBlockClient; + private final ScmClient scmClient; private final OMMetadataManager metadataManager; private final long scmBlockSize; private final boolean useRatis; @@ -122,14 +125,15 @@ public class KeyManagerImpl implements KeyManager { public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, OzoneBlockTokenSecretManager secretManager) { - this(scmBlockClient, metadataManager, conf, omId, secretManager, null); + this(new ScmClient(scmBlockClient, null), metadataManager, + conf, omId, secretManager, null); } - public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, + public KeyManagerImpl(ScmClient scmClient, OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, OzoneBlockTokenSecretManager secretManager, KeyProviderCryptoExtension kmsProvider) { - this.scmBlockClient = scmBlockClient; + this.scmClient = scmClient; this.metadataManager = metadataManager; this.scmBlockSize = (long) conf .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, @@ -159,7 +163,7 @@ public void start(OzoneConfiguration configuration) { OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - keyDeletingService = new KeyDeletingService(scmBlockClient, this, + keyDeletingService = new KeyDeletingService(scmClient.getBlockClient(), this, blockDeleteInterval, serviceTimeout, configuration); keyDeletingService.start(); } @@ -269,7 +273,7 @@ private List allocateBlock(OmKeyInfo keyInfo, String remoteUser = getRemoteUser().getShortUserName(); List allocatedBlocks; try { - allocatedBlocks = scmBlockClient + allocatedBlocks = scmClient.getBlockClient() .allocateBlock(scmBlockSize, numBlocks, keyInfo.getType(), keyInfo.getFactor(), omId, excludeList); } catch (SCMException ex) { @@ -283,7 +287,8 @@ private List allocateBlock(OmKeyInfo keyInfo, OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() .setBlockID(new BlockID(allocatedBlock.getBlockID())) .setLength(scmBlockSize) - .setOffset(0); + .setOffset(0) + .setPipeline(allocatedBlock.getPipeline()); if (grpcBlockTokenEnabled) { builder.setToken(secretManager .generateToken(remoteUser, allocatedBlock.getBlockID().toString(), @@ -575,6 +580,33 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { }); } } + // Refresh container pipeline info from SCM + // based on OmKeyArgs.refreshPipeline flag + // 1. Client send initial read request OmKeyArgs.refreshPipeline = false + // and uses the pipeline cached in OM to access datanode + // 2. If succeeded, done. + // 3. If failed due to pipeline does not exist or invalid pipeline state + // exception, client should retry lookupKey with + // OmKeyArgs.refreshPipeline = true + if (args.getRefreshPipeline()) { + for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) { + key.getLocationList().forEach(k -> { + // TODO: fix Some tests that may not initialize container client + // The production should always have containerClient initialized. + if (scmClient.getContainerClient() != null) { + try { + ContainerWithPipeline cp = scmClient.getContainerClient() + .getContainerWithPipeline(k.getContainerID()); + if (!cp.getPipeline().equals(k.getPipeline())) { + k.setPipeline(cp.getPipeline()); + } + } catch (IOException e) { + LOG.debug("Unable to update pipeline for container"); + } + } + }); + } + } return value; } catch (IOException ex) { LOG.debug("Get key failed for volume:{} bucket:{} key:{}", diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 0b77307c0c..43ad16a3e2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -334,7 +334,8 @@ private OzoneManager(OzoneConfiguration conf) throws IOException, omRpcServer = getRpcServer(conf); omRpcAddress = updateRPCListenAddress(configuration, OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer); - keyManager = new KeyManagerImpl(scmBlockClient, metadataManager, + keyManager = new KeyManagerImpl( + new ScmClient(scmBlockClient, scmContainerClient), metadataManager, configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider()); shutdownHook = () -> { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java new file mode 100644 index 0000000000..73a0cf9ca0 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; + +/** + * Wrapper class for Scm protocol clients. + */ +public class ScmClient { + + private ScmBlockLocationProtocol blockClient; + private StorageContainerLocationProtocol containerClient; + + ScmClient(ScmBlockLocationProtocol blockClient, + StorageContainerLocationProtocol containerClient) { + this.containerClient = containerClient; + this.blockClient = blockClient; + } + + ScmBlockLocationProtocol getBlockClient() { + return this.blockClient; + } + + StorageContainerLocationProtocol getContainerClient() { + return this.containerClient; + } +}