diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java index 193d80dcaf..5069220b8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Proxy; import static org.apache.hadoop.ozone.OzoneConfigKeys @@ -285,15 +286,19 @@ private static ClientProtocol getClientProtocol( Class protocolClass, Configuration config) throws IOException { try { - LOG.info("Using {} as client protocol.", + LOG.debug("Using {} as client protocol.", protocolClass.getCanonicalName()); Constructor ctor = protocolClass.getConstructor(Configuration.class); return ctor.newInstance(config); } catch (Exception e) { final String message = "Couldn't create protocol " + protocolClass; + LOG.error(message + " exception:" + e); if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); + } else if (e instanceof InvocationTargetException) { + throw new IOException(message, + ((InvocationTargetException) e).getTargetException()); } else { throw new IOException(message, e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 0e3bc47cc6..afe5e455a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -89,13 +89,11 @@ public synchronized void addStream(ChunkInputStream stream, @Override public synchronized int read() throws IOException { - checkNotClosed(); - if (streamEntries.size() <= currentStreamIndex) { + byte[] buf = new byte[1]; + if (read(buf, 0, 1) == EOF) { return EOF; } - ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex); - int data = entry.read(); - return data; + return Byte.toUnsignedInt(buf[0]); } @Override @@ -120,7 +118,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { int actualLen = current.read(b, off, readLen); // this means the underlying stream has nothing at all, return if (actualLen == EOF) { - return totalReadLen > 0? totalReadLen : EOF; + return totalReadLen > 0 ? totalReadLen : EOF; } totalReadLen += actualLen; // this means there is no more data to read beyond this point, return diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index abcb7be917..7b48f49d0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.Client; @@ -87,7 +88,7 @@ public class RpcClient implements ClientProtocol { private static final Logger LOG = LoggerFactory.getLogger(RpcClient.class); - private final Configuration conf; + private final OzoneConfiguration conf; private final StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient; private final KeySpaceManagerProtocolClientSideTranslatorPB @@ -105,7 +106,7 @@ public class RpcClient implements ClientProtocol { */ public RpcClient(Configuration conf) throws IOException { Preconditions.checkNotNull(conf); - this.conf = conf; + this.conf = new OzoneConfiguration(conf); this.ugi = UserGroupInformation.getCurrentUser(); this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS, KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java index b99103b1a2..3acbcd6594 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java @@ -360,10 +360,8 @@ public void commitKey(KsmKeyArgs args, int clientID) throws IOException { } catch (KSMException e) { throw e; } catch (IOException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Key commit failed for volume:{} bucket:{} key:{}", - volumeName, bucketName, keyName, ex); - } + LOG.error("Key commit failed for volume:{} bucket:{} key:{}", + volumeName, bucketName, keyName, ex); throw new KSMException(ex.getMessage(), KSMException.ResultCodes.FAILED_KEY_ALLOCATION); } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java index 88f66bc24e..6321923db2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java @@ -473,11 +473,17 @@ public CommitKeyResponse commitKey(RpcController controller, CommitKeyResponse.newBuilder(); try { KeyArgs keyArgs = request.getKeyArgs(); + OzoneProtos.ReplicationType type = + keyArgs.hasType()? keyArgs.getType() : null; + OzoneProtos.ReplicationFactor factor = + keyArgs.hasFactor()? keyArgs.getFactor() : null; KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder() .setVolumeName(keyArgs.getVolumeName()) .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) .setDataSize(keyArgs.getDataSize()) + .setType(type) + .setFactor(factor) .build(); int id = request.getClientID(); impl.commitKey(ksmKeyArgs, id); @@ -495,10 +501,16 @@ public AllocateBlockResponse allocateBlock(RpcController controller, AllocateBlockResponse.newBuilder(); try { KeyArgs keyArgs = request.getKeyArgs(); + OzoneProtos.ReplicationType type = + keyArgs.hasType()? keyArgs.getType() : null; + OzoneProtos.ReplicationFactor factor = + keyArgs.hasFactor()? keyArgs.getFactor() : null; KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder() .setVolumeName(keyArgs.getVolumeName()) .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) + .setType(type) + .setFactor(factor) .build(); int id = request.getClientID(); KsmKeyLocationInfo newLocation = impl.allocateBlock(ksmKeyArgs, id); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java index 16659e0953..7903b0ba85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java @@ -87,7 +87,7 @@ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { // once a datanode has been added to a pipeline, exclude it from // further allocations ratisMembers.addAll(newNodesList); - LOG.info("Allocating a new pipelineChannel of size: {}", count); + LOG.info("Allocating a new ratis pipelineChannel of size: {}", count); // Start all channel names with "Ratis", easy to grep the logs. String conduitName = PREFIX + UUID.randomUUID().toString().substring(PREFIX.length()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java index a2e6439b60..ef379267ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -83,7 +83,8 @@ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { // once a datanode has been added to a pipeline, exclude it from // further allocations standAloneMembers.addAll(newNodesList); - LOG.info("Allocating a new pipeline channel of size: {}", count); + LOG.info("Allocating a new standalone pipeline channel of size: {}", + count); String channelName = "SA-" + UUID.randomUUID().toString().substring(3); return PipelineSelector.newPipelineFromNodes(newNodesList, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index fde4819826..97eaeae2a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -1135,6 +1135,15 @@ see ozone.scm.heartbeat.thread.interval before changing this value. + + ozone.scm.max.nodepool.processing.threads + 1 + OZONE, SCM + + Controls the number of node pools that can be processed in parallel by + Container Supervisor. + + ozone.trace.enabled false diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java index 3a0a8fe2ff..4c5c0c8318 100644 --- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java @@ -47,6 +47,11 @@ public int read() throws IOException { return inputStream.read(); } + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); + } + @Override public synchronized void close() throws IOException { inputStream.close(); diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index 9f78f2df29..081d50d331 100644 --- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -582,7 +582,7 @@ private boolean isDirectory(OzoneKey key) { */ private boolean createDirectory(String keyName) { try { - LOG.info("creating dir for key:{}", keyName); + LOG.trace("creating dir for key:{}", keyName); bucket.createKey(keyName, 0, replicationType, replicationFactor).close(); return true; } catch (IOException ioe) { @@ -673,7 +673,7 @@ boolean iterate() throws IOException { LOG.trace("Iterating directory:{}", pathKey); while (keyIterator.hasNext()) { OzoneKey key = keyIterator.next(); - LOG.info("iterating key:{}", key.getName()); + LOG.trace("iterating key:{}", key.getName()); if (!processKey(key.getName())) { return false; } diff --git a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java new file mode 100644 index 0000000000..d6588ffa21 --- /dev/null +++ b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.ozone; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; +import org.apache.hadoop.ozone.MiniOzoneClassicCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.web.handlers.BucketArgs; +import org.apache.hadoop.ozone.web.handlers.UserArgs; +import org.apache.hadoop.ozone.web.interfaces.StorageHandler; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.ozone.web.handlers.VolumeArgs; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Test OzoneFSInputStream by reading through multiple interfaces. + */ +public class TestOzoneFSInputStream { + private static MiniOzoneClassicCluster cluster = null; + private static FileSystem fs; + private static StorageHandler storageHandler; + private static Path filePath = null; + private static byte[] data = null; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true and + * OZONE_HANDLER_TYPE_KEY = "distributed" + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, 10); + cluster = new MiniOzoneClassicCluster.Builder(conf) + .numDataNodes(10) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) + .build(); + storageHandler = + new ObjectStoreHandler(conf).getStorageHandler(); + + // create a volume and a bucket to be used by OzoneFileSystem + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + UserArgs userArgs = new UserArgs(null, OzoneUtils.getRequestID(), + null, null, null, null); + VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs); + volumeArgs.setUserName(userName); + volumeArgs.setAdminName(adminName); + storageHandler.createVolume(volumeArgs); + BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs); + storageHandler.createBucket(bucketArgs); + + // Fetch the host and port for File System init + DataNode dataNode = cluster.getDataNodes().get(0); + int port = dataNode.getInfoPort(); + String host = dataNode.getDatanodeHostname(); + + // Set the fs.defaultFS and start the filesystem + String uri = String.format("%s://%s:%d/%s/%s", + Constants.OZONE_URI_SCHEME, host, port, volumeName, bucketName); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, uri); + fs = FileSystem.get(conf); + int fileLen = 100 * 1024 * 1024; + data = DFSUtil.string2Bytes(RandomStringUtils.randomAlphanumeric(fileLen)); + filePath = new Path("/" + RandomStringUtils.randomAlphanumeric(5)); + try (FSDataOutputStream stream = fs.create(filePath)) { + stream.write(data); + } + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() throws IOException { + fs.close(); + storageHandler.close(); + cluster.shutdown(); + } + + @Test + public void testO3FSSingleByteRead() throws IOException { + FSDataInputStream inputStream = fs.open(filePath); + byte[] value = new byte[data.length]; + int i = 0; + while(true) { + int val = inputStream.read(); + if (val == -1) { + break; + } + value[i] = (byte)val; + Assert.assertEquals("value mismatch at:" + i, value[i], data[i]); + i++; + } + Assert.assertEquals(i, data.length); + Assert.assertTrue(Arrays.equals(value, data)); + inputStream.close(); + } + + @Test + public void testO3FSMultiByteRead() throws IOException { + FSDataInputStream inputStream = fs.open(filePath); + byte[] value = new byte[data.length]; + byte[] tmp = new byte[1* 1024 *1024]; + int i = 0; + while(true) { + int val = inputStream.read(tmp); + if (val == -1) { + break; + } + System.arraycopy(tmp, 0, value, i * tmp.length, tmp.length); + i++; + } + Assert.assertEquals(i * tmp.length, data.length); + Assert.assertTrue(Arrays.equals(value, data)); + inputStream.close(); + } +} diff --git a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java index 88d9e44575..6b65dd76f8 100644 --- a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java +++ b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java @@ -99,7 +99,9 @@ public TestOzoneFileInterfaces(boolean setDefaultFs, public void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); cluster = new MiniOzoneClassicCluster.Builder(conf) - .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + .numDataNodes(10) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) + .build(); storageHandler = new ObjectStoreHandler(conf).getStorageHandler();