HDFS-13108. Ozone: OzoneFileSystem: Simplified url schema for Ozone File System. Contributed by Elek, Marton.

This commit is contained in:
Anu Engineer 2018-03-13 17:02:53 -07:00 committed by Owen O'Malley
parent 9a914126a7
commit 61651dcf5c
4 changed files with 154 additions and 68 deletions

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
@ -45,7 +46,7 @@
* and another which will point to the HDFS backend.
*/
@InterfaceAudience.Private
public interface StorageHandler {
public interface StorageHandler extends Closeable{
/**
* Creates a Storage Volume.

View File

@ -27,6 +27,12 @@
import java.util.List;
import java.util.Objects;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
@ -47,9 +53,6 @@
import org.apache.hadoop.ozone.client.ReplicationFactor;
import org.apache.hadoop.ozone.client.ReplicationType;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -90,6 +93,9 @@ public class OzoneFileSystem extends FileSystem {
private ReplicationType replicationType;
private ReplicationFactor replicationFactor;
private static final Pattern URL_SCHEMA_PATTERN =
Pattern.compile("(.+)\\.([^\\.]+)");
@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
@ -97,29 +103,20 @@ public void initialize(URI name, Configuration conf) throws IOException {
Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
assert getScheme().equals(name.getScheme());
Path path = new Path(name.getPath());
String hostStr = name.getAuthority();
String volumeStr = null;
String bucketStr = null;
String authority = name.getAuthority();
while (path != null && !path.isRoot()) {
bucketStr = volumeStr;
volumeStr = path.getName();
path = path.getParent();
}
Matcher matcher = URL_SCHEMA_PATTERN.matcher(authority);
if (hostStr == null) {
throw new IllegalArgumentException("No host provided in " + name);
} else if (volumeStr == null) {
throw new IllegalArgumentException("No volume provided in " + name);
} else if (bucketStr == null) {
throw new IllegalArgumentException("No bucket provided in " + name);
if (!matcher.matches()) {
throw new IllegalArgumentException("Ozone file system url should be "
+ "in the form o3://bucket.volume");
}
String bucketStr = matcher.group(1);
String volumeStr = matcher.group(2);
try {
uri = new URIBuilder().setScheme(OZONE_URI_SCHEME).setHost(hostStr)
.setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER
+ bucketStr + OZONE_URI_DELIMITER).build();
uri = new URIBuilder().setScheme(OZONE_URI_SCHEME)
.setHost(authority).build();
LOG.trace("Ozone URI for ozfs initialization is " + uri);
this.ozoneClient = OzoneClientFactory.getRpcClient(conf);
objectStore = ozoneClient.getObjectStore();
@ -302,14 +299,12 @@ public boolean rename(Path src, Path dst) throws IOException {
}
// Cannot rename a directory to its own subdirectory
Path parent = dst.getParent();
while (parent != null && !src.equals(parent)) {
parent = parent.getParent();
Path dstParent = dst.getParent();
while (dstParent != null && !src.equals(dstParent)) {
dstParent = dstParent.getParent();
}
if (parent != null) {
return false;
}
Preconditions.checkArgument(dstParent == null,
"Cannot rename a directory to its own subdirectory");
// Check if the source exists
FileStatus srcStatus;
try {
@ -435,7 +430,7 @@ boolean processKey(String key) throws IOException {
}
}
// left with only subkeys now
if (keyPath.getParent().getName().equals(f.getName())) {
if (pathToKey(keyPath.getParent()).equals(pathToKey(f))) {
// skip keys which are for subdirectories of the directory
statuses.add(getFileStatus(keyPath));
}

View File

@ -18,31 +18,42 @@
package org.apache.hadoop.fs.ozone;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.After;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.Assert;
import java.io.IOException;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test OzoneFileSystem Interfaces.
@ -50,13 +61,42 @@
* This test will test the various interfaces i.e.
* create, read, write, getFileStatus
*/
@RunWith(Parameterized.class)
public class TestOzoneFileInterfaces {
private String rootPath;
private String userName;
/**
* Parameter class to set absolute url/defaultFS handling.
* <p>
* Hadoop file systems could be used in multiple ways: Using the defaultfs
* and file path without the schema, or use absolute url-s even with
* different defaultFS. This parameter matrix would test both the use cases.
*/
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{false, true}, {true, false}});
}
private boolean setDefaultFs;
private boolean useAbsolutePath;
private static MiniOzoneClassicCluster cluster = null;
private static FileSystem fs;
private static StorageHandler storageHandler;
@BeforeClass
public static void init() throws IOException, OzoneException {
public TestOzoneFileInterfaces(boolean setDefaultFs,
boolean useAbsolutePath) {
this.setDefaultFs = setDefaultFs;
this.useAbsolutePath = useAbsolutePath;
}
@Before
public void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
@ -64,7 +104,7 @@ public static void init() throws IOException, OzoneException {
new ObjectStoreHandler(conf).getStorageHandler();
// create a volume and a bucket to be used by OzoneFileSystem
String userName = "user" + RandomStringUtils.randomNumeric(5);
userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
@ -82,24 +122,34 @@ public static void init() throws IOException, OzoneException {
int port = dataNode.getInfoPort();
String host = dataNode.getDatanodeHostname();
// Set the fs.defaultFS and start the filesystem
String uri = String.format("%s://%s:%d/%s/%s",
Constants.OZONE_URI_SCHEME, host, port, volumeName, bucketName);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, uri);
fs = FileSystem.get(conf);
rootPath = String
.format("%s://%s.%s/", Constants.OZONE_URI_SCHEME, bucketName,
volumeName);
if (setDefaultFs) {
// Set the fs.defaultFS and start the filesystem
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
fs = FileSystem.get(conf);
} else {
fs = FileSystem.get(new URI(rootPath + "/test.txt"), conf);
}
}
@AfterClass
public static void teardown() throws IOException {
fs.close();
storageHandler.close();
cluster.shutdown();
@After
public void teardown() throws IOException {
IOUtils.closeQuietly(fs);
IOUtils.closeQuietly(storageHandler);
IOUtils.closeQuietly(cluster);
}
@Test
public void testFileSystemInit() throws IOException {
Assert.assertTrue(fs instanceof OzoneFileSystem);
Assert.assertEquals(Constants.OZONE_URI_SCHEME, fs.getUri().getScheme());
if (setDefaultFs) {
assertTrue(
"The initialized file system is not OzoneFileSysetem but " +
fs.getClass(),
fs instanceof OzoneFileSystem);
assertEquals(Constants.OZONE_URI_SCHEME, fs.getUri().getScheme());
}
}
@Test
@ -108,7 +158,7 @@ public void testOzFsReadWrite() throws IOException {
int stringLen = 20;
String data = RandomStringUtils.randomAlphanumeric(stringLen);
String filePath = RandomStringUtils.randomAlphanumeric(5);
Path path = new Path("/" + filePath);
Path path = createPath("/" + filePath);
try (FSDataOutputStream stream = fs.create(path)) {
stream.writeBytes(data);
}
@ -116,28 +166,68 @@ public void testOzFsReadWrite() throws IOException {
FileStatus status = fs.getFileStatus(path);
// The timestamp of the newly created file should always be greater than
// the time when the test was started
Assert.assertTrue(status.getModificationTime() > currentTime);
assertTrue("Modification time has not been recorded: " + status,
status.getModificationTime() > currentTime);
try (FSDataInputStream inputStream = fs.open(path)) {
byte[] buffer = new byte[stringLen];
inputStream.readFully(0, buffer);
String out = new String(buffer, 0, buffer.length);
Assert.assertEquals(data, out);
assertEquals(data, out);
}
}
@Test
public void testDirectory() throws IOException {
String dirPath = RandomStringUtils.randomAlphanumeric(5);
Path path = new Path("/" + dirPath);
Assert.assertTrue(fs.mkdirs(path));
Path path = createPath("/" + dirPath);
assertTrue("Makedirs returned with false for the path " + path,
fs.mkdirs(path));
FileStatus status = fs.getFileStatus(path);
Assert.assertTrue(status.isDirectory());
Assert.assertEquals(0, status.getLen());
assertTrue("The created path is not directory.", status.isDirectory());
FileStatus[] statusList = fs.listStatus(new Path("/"));
Assert.assertEquals(1, statusList.length);
Assert.assertEquals(status, statusList[0]);
assertEquals(0, status.getLen());
FileStatus[] statusList = fs.listStatus(createPath("/"));
assertEquals(1, statusList.length);
assertEquals(status, statusList[0]);
FileStatus statusRoot = fs.getFileStatus(createPath("/"));
assertTrue("Root dir (/) is not a directory.", status.isDirectory());
assertEquals(0, status.getLen());
}
@Test
public void testPathToKey() throws Exception {
OzoneFileSystem ozoneFs = (OzoneFileSystem) TestOzoneFileInterfaces.fs;
assertEquals("a/b/1", ozoneFs.pathToKey(new Path("/a/b/1")));
assertEquals("user/" + getCurrentUser() + "/key1/key2",
ozoneFs.pathToKey(new Path("key1/key2")));
assertEquals("key1/key2",
ozoneFs.pathToKey(new Path("o3://test1/key1/key2")));
}
private String getCurrentUser() {
try {
return UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
return OZONE_DEFAULT_USER;
}
}
private Path createPath(String relativePath) {
if (useAbsolutePath) {
return new Path(
rootPath + (relativePath.startsWith("/") ? "" : "/") + relativePath);
} else {
return new Path(relativePath);
}
}
}

View File

@ -108,8 +108,8 @@ public FileSystem getTestFileSystem() throws IOException {
DataNode dataNode = cluster.getDataNodes().get(0);
final int port = dataNode.getInfoPort();
String uri = String.format("%s://localhost:%d/%s/%s",
Constants.OZONE_URI_SCHEME, port, volumeName, bucketName);
String uri = String.format("%s://%s.%s/",
Constants.OZONE_URI_SCHEME, bucketName, volumeName);
getConf().set("fs.defaultFS", uri);
copyClusterConfigs(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY);
copyClusterConfigs(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);