diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/FSRegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/FSRegistryOperationsService.java new file mode 100644 index 0000000000..cfff1bdcbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/FSRegistryOperationsService.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.exceptions.NoRecordException; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * Filesystem-based implementation of RegistryOperations. This class relies + * entirely on the configured FS for security and does no extra checks. + */ +public class FSRegistryOperationsService extends CompositeService + implements RegistryOperations { + + private FileSystem fs; + private static final Logger LOG = + LoggerFactory.getLogger(FSRegistryOperationsService.class); + private final RegistryUtils.ServiceRecordMarshal serviceRecordMarshal = + new RegistryUtils.ServiceRecordMarshal(); + + public FSRegistryOperationsService() { + super(FSRegistryOperationsService.class.getName()); + } + + @VisibleForTesting + public FileSystem getFs() { + return this.fs; + } + + @Override + protected void serviceInit(Configuration conf) { + try { + this.fs = FileSystem.get(conf); + LOG.info("Initialized Yarn-registry with Filesystem " + + fs.getClass().getCanonicalName()); + } catch (IOException e) { + LOG.error("Failed to get FileSystem for registry", e); + throw new YarnRuntimeException(e); + } + } + + private Path makePath(String path) { + return new Path(path); + } + + private Path formatDataPath(String basePath) { + return Path.mergePaths(new Path(basePath), new Path("/_record")); + } + + private String relativize(String basePath, String childPath) { + String relative = new File(basePath).toURI() + .relativize(new File(childPath).toURI()).getPath(); + return relative; + } + + @Override + public boolean mknode(String path, boolean createParents) + throws PathNotFoundException, InvalidPathnameException, IOException { + Path registryPath = makePath(path); + + // getFileStatus throws FileNotFound if the path doesn't exist. If the + // file already exists, return. + try { + fs.getFileStatus(registryPath); + return false; + } catch (FileNotFoundException e) { + } + + if (createParents) { + // By default, mkdirs creates any parent dirs it needs + fs.mkdirs(registryPath); + } else { + FileStatus parentStatus = null; + + if (registryPath.getParent() != null) { + parentStatus = fs.getFileStatus(registryPath.getParent()); + } + + if (registryPath.getParent() == null || parentStatus.isDirectory()) { + fs.mkdirs(registryPath); + } else { + throw new PathNotFoundException("no parent for " + path); + } + } + return true; + } + + @Override + public void bind(String path, ServiceRecord record, int flags) + throws PathNotFoundException, FileAlreadyExistsException, + InvalidPathnameException, IOException { + + // Preserve same overwrite semantics as ZK implementation + Preconditions.checkArgument(record != null, "null record"); + RegistryTypeUtils.validateServiceRecord(path, record); + + Path dataPath = formatDataPath(path); + Boolean overwrite = ((flags & BindFlags.OVERWRITE) != 0); + if (fs.exists(dataPath) && !overwrite) { + throw new FileAlreadyExistsException(); + } else { + // Either the file doesn't exist, or it exists and we're + // overwriting. Create overwrites by default and creates parent dirs if + // needed. + FSDataOutputStream stream = fs.create(dataPath); + byte[] bytes = serviceRecordMarshal.toBytes(record); + stream.write(bytes); + stream.close(); + LOG.info("Bound record to path " + dataPath); + } + } + + @Override + public ServiceRecord resolve(String path) throws PathNotFoundException, + NoRecordException, InvalidRecordException, IOException { + // Read the entire file into byte array, should be small metadata + + Long size = fs.getFileStatus(formatDataPath(path)).getLen(); + byte[] bytes = new byte[size.intValue()]; + + FSDataInputStream instream = fs.open(formatDataPath(path)); + int bytesRead = instream.read(bytes); + instream.close(); + + if (bytesRead < size) { + throw new InvalidRecordException(path, + "Expected " + size + " bytes, but read " + bytesRead); + } + + // Unmarshal, check, and return + ServiceRecord record = serviceRecordMarshal.fromBytes(path, bytes); + RegistryTypeUtils.validateServiceRecord(path, record); + return record; + } + + @Override + public RegistryPathStatus stat(String path) + throws PathNotFoundException, InvalidPathnameException, IOException { + FileStatus fstat = fs.getFileStatus(formatDataPath(path)); + int numChildren = fs.listStatus(makePath(path)).length; + + RegistryPathStatus regstat = + new RegistryPathStatus(fstat.getPath().toString(), + fstat.getModificationTime(), fstat.getLen(), numChildren); + + return regstat; + } + + @Override + public boolean exists(String path) throws IOException { + return fs.exists(makePath(path)); + } + + @Override + public List list(String path) + throws PathNotFoundException, InvalidPathnameException, IOException { + FileStatus[] statArray = fs.listStatus(makePath(path)); + String basePath = fs.getFileStatus(makePath(path)).getPath().toString(); + + List paths = new ArrayList(); + + FileStatus stat; + // Only count dirs; the _record files are hidden. + for (int i = 0; i < statArray.length; i++) { + stat = statArray[i]; + if (stat.isDirectory()) { + String relativePath = relativize(basePath, stat.getPath().toString()); + paths.add(relativePath); + } + } + + return paths; + } + + @Override + public void delete(String path, boolean recursive) + throws PathNotFoundException, PathIsNotEmptyDirectoryException, + InvalidPathnameException, IOException { + Path dirPath = makePath(path); + if (!fs.exists(dirPath)) { + throw new PathNotFoundException(path); + } + + // If recursive == true, or dir is empty, delete. + if (recursive || list(path).isEmpty()) { + fs.delete(makePath(path), true); + return; + } + + throw new PathIsNotEmptyDirectoryException(path); + } + + @Override + public boolean addWriteAccessor(String id, String pass) throws IOException { + throw new NotImplementedException(); + } + + @Override + public void clearWriteAccessors() { + throw new NotImplementedException(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java index d40866a815..9bb02c3cc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java @@ -244,5 +244,69 @@ protected Object clone() throws CloneNotSupportedException { return super.clone(); } + @Override + public int hashCode() { + // Generated by eclipse + final int prime = 31; + int result = 1; + result = + prime * result + ((attributes == null) ? 0 : attributes.hashCode()); + result = + prime * result + ((description == null) ? 0 : description.hashCode()); + result = prime * result + ((external == null) ? 0 : external.hashCode()); + result = prime * result + ((internal == null) ? 0 : internal.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ServiceRecord other = (ServiceRecord) obj; + if (attributes == null) { + if (other.attributes != null) { + return false; + } + } else if (!attributes.equals(other.attributes)) { + return false; + } + if (description == null) { + if (other.description != null) { + return false; + } + } else if (!description.equals(other.description)) { + return false; + } + if (external == null) { + if (other.external != null) { + return false; + } + } else if (!external.equals(other.external)) { + return false; + } + if (internal == null) { + if (other.internal != null) { + return false; + } + } else if (!internal.equals(other.internal)) { + return false; + } + if (type == null) { + if (other.type != null) { + return false; + } + } else if (!type.equals(other.type)) { + return false; + } + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/impl/TestFSRegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/impl/TestFSRegistryOperationsService.java new file mode 100644 index 0000000000..dffa4a7af1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/impl/TestFSRegistryOperationsService.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl; + +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * FSRegistryOperationsService test, using the local filesystem. + */ +public class TestFSRegistryOperationsService { + private static FSRegistryOperationsService registry = + new FSRegistryOperationsService(); + private static FileSystem fs; + + @BeforeClass + public static void initRegistry() throws IOException { + Assert.assertNotNull(registry); + registry.init(new Configuration()); + fs = registry.getFs(); + fs.delete(new Path("test"), true); + } + + @Before + public void createTestDir() throws IOException { + fs.mkdirs(new Path("test")); + } + + @After + public void cleanTestDir() throws IOException { + fs.delete(new Path("test"), true); + } + + @Test + public void testMkNodeNonRecursive() + throws InvalidPathnameException, PathNotFoundException, IOException { + boolean result = false; + System.out.println("Make node with parent already made, nonrecursive"); + result = registry.mknode("test/registryTestNode", false); + Assert.assertTrue(result); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode"))); + + // Expected to fail + try { + System.out.println("Try to make node with no parent, nonrecursive"); + registry.mknode("test/parent/registryTestNode", false); + Assert.fail("Should not have created node"); + } catch (IOException e) { + } + Assert.assertFalse(fs.exists(new Path("test/parent/registryTestNode"))); + } + + @Test + public void testMkNodeRecursive() throws IOException { + boolean result = false; + System.out.println("Make node with parent already made, recursive"); + result = registry.mknode("test/registryTestNode", true); + Assert.assertTrue(result); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode"))); + + result = false; + System.out.println("Try to make node with no parent, recursive"); + result = registry.mknode("test/parent/registryTestNode", true); + Assert.assertTrue(result); + Assert.assertTrue(fs.exists(new Path("test/parent/registryTestNode"))); + + } + + @Test + public void testMkNodeAlreadyExists() throws IOException { + System.out.println("pre-create test path"); + fs.mkdirs(new Path("test/registryTestNode")); + + System.out.println( + "Try to mknode existing path -- should be noop and return false"); + Assert.assertFalse(registry.mknode("test/registryTestNode", true)); + Assert.assertFalse(registry.mknode("test/registryTestNode", false)); + } + + @Test + public void testBindParentPath() throws InvalidPathnameException, + PathNotFoundException, FileAlreadyExistsException, IOException { + ServiceRecord record = createRecord("0"); + + System.out.println("pre-create test path"); + fs.mkdirs(new Path("test/parent1/registryTestNode")); + + registry.bind("test/parent1/registryTestNode", record, 1); + Assert.assertTrue( + fs.exists(new Path("test/parent1/registryTestNode/_record"))); + + // Test without pre-creating path + registry.bind("test/parent2/registryTestNode", record, 1); + Assert.assertTrue(fs.exists(new Path("test/parent2/registryTestNode"))); + + } + + @Test + public void testBindAlreadyExists() throws IOException { + ServiceRecord record1 = createRecord("1"); + ServiceRecord record2 = createRecord("2"); + + System.out.println("Bind record1"); + registry.bind("test/registryTestNode", record1, 1); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/_record"))); + + System.out.println("Bind record2, overwrite = 1"); + registry.bind("test/registryTestNode", record2, 1); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/_record"))); + + // The record should have been overwritten + ServiceRecord readRecord = registry.resolve("test/registryTestNode"); + Assert.assertTrue(readRecord.equals(record2)); + + System.out.println("Bind record3, overwrite = 0"); + try { + registry.bind("test/registryTestNode", record1, 0); + Assert.fail("Should not overwrite record"); + } catch (IOException e) { + } + + // The record should not be overwritten + readRecord = registry.resolve("test/registryTestNode"); + Assert.assertTrue(readRecord.equals(record2)); + } + + @Test + public void testResolve() throws IOException { + ServiceRecord record = createRecord("0"); + registry.bind("test/registryTestNode", record, 1); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/_record"))); + + System.out.println("Read record that exists"); + ServiceRecord readRecord = registry.resolve("test/registryTestNode"); + Assert.assertNotNull(readRecord); + Assert.assertTrue(record.equals(readRecord)); + + System.out.println("Try to read record that does not exist"); + try { + readRecord = registry.resolve("test/nonExistentNode"); + Assert.fail("Should throw an error, record does not exist"); + } catch (IOException e) { + } + } + + @Test + public void testExists() throws IOException { + System.out.println("pre-create test path"); + fs.mkdirs(new Path("test/registryTestNode")); + + System.out.println("Check for existing node"); + boolean exists = registry.exists("test/registryTestNode"); + Assert.assertTrue(exists); + + System.out.println("Check for non-existing node"); + exists = registry.exists("test/nonExistentNode"); + Assert.assertFalse(exists); + } + + @Test + public void testDeleteDirsOnly() throws IOException { + System.out.println("pre-create test path with children"); + fs.mkdirs(new Path("test/registryTestNode")); + fs.mkdirs(new Path("test/registryTestNode/child1")); + fs.mkdirs(new Path("test/registryTestNode/child2")); + + try { + registry.delete("test/registryTestNode", false); + Assert.fail("Deleted dir wich children, nonrecursive flag set"); + } catch (IOException e) { + } + // Make sure nothing was deleted + Assert.assertTrue(fs.exists(new Path("test/registryTestNode"))); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child1"))); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child2"))); + + System.out.println("Delete leaf path 'test/registryTestNode/child2'"); + registry.delete("test/registryTestNode/child2", false); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode"))); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child1"))); + Assert.assertFalse(fs.exists(new Path("test/registryTestNode/child2"))); + + System.out + .println("Recursively delete non-leaf path 'test/registryTestNode'"); + registry.delete("test/registryTestNode", true); + Assert.assertFalse(fs.exists(new Path("test/registryTestNode"))); + } + + @Test + public void testDeleteWithRecords() throws IOException { + System.out.println("pre-create test path with children and mocked records"); + + fs.mkdirs(new Path("test/registryTestNode")); + fs.mkdirs(new Path("test/registryTestNode/child1")); + fs.mkdirs(new Path("test/registryTestNode/child2")); + + // Create and close stream immediately so they aren't blocking + fs.create(new Path("test/registryTestNode/_record")).close(); + fs.create(new Path("test/registryTestNode/child1/_record")).close(); + + System.out.println("Delete dir with child nodes and record file"); + try { + registry.delete("test/registryTestNode", false); + Assert.fail("Nonrecursive delete of non-empty dir"); + } catch (PathIsNotEmptyDirectoryException e) { + } + + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/_record"))); + Assert.assertTrue( + fs.exists(new Path("test/registryTestNode/child1/_record"))); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child2"))); + + System.out.println("Delete dir with record file and no child dirs"); + registry.delete("test/registryTestNode/child1", false); + Assert.assertFalse(fs.exists(new Path("test/registryTestNode/child1"))); + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child2"))); + + System.out.println("Delete dir with child dir and no record file"); + try { + registry.delete("test/registryTestNode", false); + Assert.fail("Nonrecursive delete of non-empty dir"); + } catch (PathIsNotEmptyDirectoryException e) { + } + Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child2"))); + } + + @Test + public void testList() throws IOException { + System.out.println("pre-create test path with children and mocked records"); + + fs.mkdirs(new Path("test/registryTestNode")); + fs.mkdirs(new Path("test/registryTestNode/child1")); + fs.mkdirs(new Path("test/registryTestNode/child2")); + + // Create and close stream immediately so they aren't blocking + fs.create(new Path("test/registryTestNode/_record")).close(); + fs.create(new Path("test/registryTestNode/child1/_record")).close(); + + List ls = null; + + ls = registry.list("test/registryTestNode"); + Assert.assertNotNull(ls); + Assert.assertEquals(2, ls.size()); + System.out.println(ls); + Assert.assertTrue(ls.contains("child1")); + Assert.assertTrue(ls.contains("child2")); + + ls = null; + ls = registry.list("test/registryTestNode/child1"); + Assert.assertNotNull(ls); + Assert.assertTrue(ls.isEmpty()); + ls = null; + ls = registry.list("test/registryTestNode/child2"); + Assert.assertNotNull(ls); + Assert.assertTrue(ls.isEmpty()); + } + + private ServiceRecord createRecord(String id) { + System.out.println("Creating mock service record"); + + ServiceRecord record = new ServiceRecord(); + record.set(YarnRegistryAttributes.YARN_ID, id); + record.description = "testRecord"; + return record; + } +}