HDFS-13137. Ozone: Ozonefs read fails because ChunkGroupInputStream#read does not iterate through all the blocks in the key.

Contributed by Mukul Kumar Singh.
This commit is contained in:
Anu Engineer 2018-03-13 17:27:57 -07:00 committed by Owen O'Malley
parent 61651dcf5c
commit 5e7164c614
12 changed files with 204 additions and 18 deletions

View File

@ -27,6 +27,7 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import static org.apache.hadoop.ozone.OzoneConfigKeys
@ -285,15 +286,19 @@ private static ClientProtocol getClientProtocol(
Class<? extends ClientProtocol> protocolClass, Configuration config)
throws IOException {
try {
LOG.info("Using {} as client protocol.",
LOG.debug("Using {} as client protocol.",
protocolClass.getCanonicalName());
Constructor<? extends ClientProtocol> ctor =
protocolClass.getConstructor(Configuration.class);
return ctor.newInstance(config);
} catch (Exception e) {
final String message = "Couldn't create protocol " + protocolClass;
LOG.error(message + " exception:" + e);
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else if (e instanceof InvocationTargetException) {
throw new IOException(message,
((InvocationTargetException) e).getTargetException());
} else {
throw new IOException(message, e);
}

View File

@ -89,13 +89,11 @@ public synchronized void addStream(ChunkInputStream stream,
@Override
public synchronized int read() throws IOException {
checkNotClosed();
if (streamEntries.size() <= currentStreamIndex) {
byte[] buf = new byte[1];
if (read(buf, 0, 1) == EOF) {
return EOF;
}
ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex);
int data = entry.read();
return data;
return Byte.toUnsignedInt(buf[0]);
}
@Override
@ -120,7 +118,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
int actualLen = current.read(b, off, readLen);
// this means the underlying stream has nothing at all, return
if (actualLen == EOF) {
return totalReadLen > 0? totalReadLen : EOF;
return totalReadLen > 0 ? totalReadLen : EOF;
}
totalReadLen += actualLen;
// this means there is no more data to read beyond this point, return

View File

@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Client;
@ -87,7 +88,7 @@ public class RpcClient implements ClientProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(RpcClient.class);
private final Configuration conf;
private final OzoneConfiguration conf;
private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private final KeySpaceManagerProtocolClientSideTranslatorPB
@ -105,7 +106,7 @@ public class RpcClient implements ClientProtocol {
*/
public RpcClient(Configuration conf) throws IOException {
Preconditions.checkNotNull(conf);
this.conf = conf;
this.conf = new OzoneConfiguration(conf);
this.ugi = UserGroupInformation.getCurrentUser();
this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);

View File

@ -360,10 +360,8 @@ public void commitKey(KsmKeyArgs args, int clientID) throws IOException {
} catch (KSMException e) {
throw e;
} catch (IOException ex) {
if (!(ex instanceof KSMException)) {
LOG.error("Key commit failed for volume:{} bucket:{} key:{}",
volumeName, bucketName, keyName, ex);
}
LOG.error("Key commit failed for volume:{} bucket:{} key:{}",
volumeName, bucketName, keyName, ex);
throw new KSMException(ex.getMessage(),
KSMException.ResultCodes.FAILED_KEY_ALLOCATION);
} finally {

View File

@ -473,11 +473,17 @@ public CommitKeyResponse commitKey(RpcController controller,
CommitKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OzoneProtos.ReplicationType type =
keyArgs.hasType()? keyArgs.getType() : null;
OzoneProtos.ReplicationFactor factor =
keyArgs.hasFactor()? keyArgs.getFactor() : null;
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setDataSize(keyArgs.getDataSize())
.setType(type)
.setFactor(factor)
.build();
int id = request.getClientID();
impl.commitKey(ksmKeyArgs, id);
@ -495,10 +501,16 @@ public AllocateBlockResponse allocateBlock(RpcController controller,
AllocateBlockResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OzoneProtos.ReplicationType type =
keyArgs.hasType()? keyArgs.getType() : null;
OzoneProtos.ReplicationFactor factor =
keyArgs.hasFactor()? keyArgs.getFactor() : null;
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setType(type)
.setFactor(factor)
.build();
int id = request.getClientID();
KsmKeyLocationInfo newLocation = impl.allocateBlock(ksmKeyArgs, id);

View File

@ -87,7 +87,7 @@ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new pipelineChannel of size: {}", count);
LOG.info("Allocating a new ratis pipelineChannel of size: {}", count);
// Start all channel names with "Ratis", easy to grep the logs.
String conduitName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());

View File

@ -83,7 +83,8 @@ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
LOG.info("Allocating a new pipeline channel of size: {}", count);
LOG.info("Allocating a new standalone pipeline channel of size: {}",
count);
String channelName =
"SA-" + UUID.randomUUID().toString().substring(3);
return PipelineSelector.newPipelineFromNodes(newNodesList,

View File

@ -1135,6 +1135,15 @@
see ozone.scm.heartbeat.thread.interval before changing this value.
</description>
</property>
<property>
<name>ozone.scm.max.nodepool.processing.threads</name>
<value>1</value>
<tag>OZONE, SCM</tag>
<description>
Controls the number of node pools that can be processed in parallel by
Container Supervisor.
</description>
</property>
<property>
<name>ozone.trace.enabled</name>
<value>false</value>

View File

@ -47,6 +47,11 @@ public int read() throws IOException {
return inputStream.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return inputStream.read(b, off, len);
}
@Override
public synchronized void close() throws IOException {
inputStream.close();

View File

@ -582,7 +582,7 @@ private boolean isDirectory(OzoneKey key) {
*/
private boolean createDirectory(String keyName) {
try {
LOG.info("creating dir for key:{}", keyName);
LOG.trace("creating dir for key:{}", keyName);
bucket.createKey(keyName, 0, replicationType, replicationFactor).close();
return true;
} catch (IOException ioe) {
@ -673,7 +673,7 @@ boolean iterate() throws IOException {
LOG.trace("Iterating directory:{}", pathKey);
while (keyIterator.hasNext()) {
OzoneKey key = keyIterator.next();
LOG.info("iterating key:{}", key.getName());
LOG.trace("iterating key:{}", key.getName());
if (!processKey(key.getName())) {
return false;
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
/**
* Test OzoneFSInputStream by reading through multiple interfaces.
*/
public class TestOzoneFSInputStream {
private static MiniOzoneClassicCluster cluster = null;
private static FileSystem fs;
private static StorageHandler storageHandler;
private static Path filePath = null;
private static byte[] data = null;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed"
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, 10);
cluster = new MiniOzoneClassicCluster.Builder(conf)
.numDataNodes(10)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.build();
storageHandler =
new ObjectStoreHandler(conf).getStorageHandler();
// create a volume and a bucket to be used by OzoneFileSystem
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
UserArgs userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
null, null, null, null);
VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
volumeArgs.setUserName(userName);
volumeArgs.setAdminName(adminName);
storageHandler.createVolume(volumeArgs);
BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
storageHandler.createBucket(bucketArgs);
// Fetch the host and port for File System init
DataNode dataNode = cluster.getDataNodes().get(0);
int port = dataNode.getInfoPort();
String host = dataNode.getDatanodeHostname();
// Set the fs.defaultFS and start the filesystem
String uri = String.format("%s://%s:%d/%s/%s",
Constants.OZONE_URI_SCHEME, host, port, volumeName, bucketName);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, uri);
fs = FileSystem.get(conf);
int fileLen = 100 * 1024 * 1024;
data = DFSUtil.string2Bytes(RandomStringUtils.randomAlphanumeric(fileLen));
filePath = new Path("/" + RandomStringUtils.randomAlphanumeric(5));
try (FSDataOutputStream stream = fs.create(filePath)) {
stream.write(data);
}
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() throws IOException {
fs.close();
storageHandler.close();
cluster.shutdown();
}
@Test
public void testO3FSSingleByteRead() throws IOException {
FSDataInputStream inputStream = fs.open(filePath);
byte[] value = new byte[data.length];
int i = 0;
while(true) {
int val = inputStream.read();
if (val == -1) {
break;
}
value[i] = (byte)val;
Assert.assertEquals("value mismatch at:" + i, value[i], data[i]);
i++;
}
Assert.assertEquals(i, data.length);
Assert.assertTrue(Arrays.equals(value, data));
inputStream.close();
}
@Test
public void testO3FSMultiByteRead() throws IOException {
FSDataInputStream inputStream = fs.open(filePath);
byte[] value = new byte[data.length];
byte[] tmp = new byte[1* 1024 *1024];
int i = 0;
while(true) {
int val = inputStream.read(tmp);
if (val == -1) {
break;
}
System.arraycopy(tmp, 0, value, i * tmp.length, tmp.length);
i++;
}
Assert.assertEquals(i * tmp.length, data.length);
Assert.assertTrue(Arrays.equals(value, data));
inputStream.close();
}
}

View File

@ -99,7 +99,9 @@ public TestOzoneFileInterfaces(boolean setDefaultFs,
public void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
.numDataNodes(10)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.build();
storageHandler =
new ObjectStoreHandler(conf).getStorageHandler();