YARN-7250. Update Shared cache client api to use URLs.

This commit is contained in:
Chris Trezzo 2017-09-28 15:28:06 -07:00
parent 6f789fe057
commit c114da5e64
3 changed files with 23 additions and 66 deletions

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.impl.SharedCacheClientImpl; import org.apache.hadoop.yarn.client.api.impl.SharedCacheClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -58,34 +59,25 @@ public SharedCacheClient(String name) {
* *
* <p> * <p>
* The <code>SharedCacheManager</code> responds with whether or not the * The <code>SharedCacheManager</code> responds with whether or not the
* resource exists in the cache. If the resource exists, a <code>Path</code> * resource exists in the cache. If the resource exists, a <code>URL</code> to
* to the resource in the shared cache is returned. If the resource does not * the resource in the shared cache is returned. If the resource does not
* exist, null is returned instead. * exist, null is returned instead.
* </p> * </p>
* *
* <p> * <p>
* Once a path has been returned for a resource, that path is safe to use for * Once a URL has been returned for a resource, that URL is safe to use for
* the lifetime of the application that corresponds to the provided * the lifetime of the application that corresponds to the provided
* ApplicationId. * ApplicationId.
* </p> * </p>
* *
* <p>
* Additionally, a name for the resource should be specified. A fragment will
* be added to the path with the desired name if the desired name is different
* than the name of the provided path from the shared cache. This ensures that
* if the returned path is used to create a LocalResource, then the symlink
* created during YARN localization will match the name specified.
* </p>
*
* @param applicationId ApplicationId of the application using the resource * @param applicationId ApplicationId of the application using the resource
* @param resourceKey the key (i.e. checksum) that identifies the resource * @param resourceKey the key (i.e. checksum) that identifies the resource
* @param resourceName the desired name of the resource * @return URL to the resource, or null if it does not exist
* @return Path to the resource, or null if it does not exist
*/ */
@Public @Public
@Unstable @Unstable
public abstract Path use(ApplicationId applicationId, String resourceKey, public abstract URL use(ApplicationId applicationId, String resourceKey)
String resourceName) throws YarnException; throws YarnException;
/** /**
* <p> * <p>

View File

@ -21,8 +21,6 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,6 +36,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.SharedCacheClient; import org.apache.hadoop.yarn.client.api.SharedCacheClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -113,8 +112,8 @@ protected void stopClientProxy() {
} }
@Override @Override
public Path use(ApplicationId applicationId, String resourceKey, public URL use(ApplicationId applicationId, String resourceKey)
String resourceName) throws YarnException { throws YarnException {
Path resourcePath = null; Path resourcePath = null;
UseSharedCacheResourceRequest request = Records.newRecord( UseSharedCacheResourceRequest request = Records.newRecord(
UseSharedCacheResourceRequest.class); UseSharedCacheResourceRequest.class);
@ -132,32 +131,13 @@ public Path use(ApplicationId applicationId, String resourceKey,
throw new YarnException(e); throw new YarnException(e);
} }
if (resourcePath != null) { if (resourcePath != null) {
if (resourcePath.getName().equals(resourceName)) { URL pathURL = URL.fromPath(resourcePath);
// The preferred name is the same as the name of the item in the cache, return pathURL;
// so we skip generating the fragment to save space in the MRconfig.
return resourcePath;
} else { } else {
// We are using the shared cache, and a preferred name has been // The resource was not in the cache.
// specified that is different than the name of the resource in the return null;
// shared cache. We need to set the fragment portion of the URI to
// preserve the desired name.
URI pathURI = resourcePath.toUri();
try {
// We assume that there is no existing fragment in the URI since the
// shared cache manager does not use fragments.
pathURI =
new URI(pathURI.getScheme(), pathURI.getSchemeSpecificPart(),
resourceName);
resourcePath = new Path(pathURI);
} catch (URISyntaxException e) {
throw new YarnException(
"Could not create a new URI due to syntax errors: "
+ pathURI.toString(), e);
} }
} }
}
return resourcePath;
}
@Override @Override
public void release(ApplicationId applicationId, String resourceKey) public void release(ApplicationId applicationId, String resourceKey)

View File

@ -27,7 +27,6 @@
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -40,6 +39,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -114,36 +114,21 @@ public void testUseCacheMiss() throws Exception {
response.setPath(null); response.setPath(null);
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn( when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
response); response);
Path newPath = client.use(mock(ApplicationId.class), "key", null); URL newURL = client.use(mock(ApplicationId.class), "key");
assertNull("The path is not null!", newPath); assertNull("The path is not null!", newURL);
} }
@Test @Test
public void testUseWithResourceName() throws Exception { public void testUseCacheHit() throws Exception {
Path file = new Path("viewfs://test/path"); Path file = new Path("viewfs://test/path");
URI useUri = new URI("viewfs://test/path#linkName"); URL useUrl = URL.fromPath(new Path("viewfs://test/path"));
Path usePath = new Path(useUri);
UseSharedCacheResourceResponse response = UseSharedCacheResourceResponse response =
new UseSharedCacheResourceResponsePBImpl(); new UseSharedCacheResourceResponsePBImpl();
response.setPath(file.toString()); response.setPath(file.toString());
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn( when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
response); response);
Path newPath = client.use(mock(ApplicationId.class), "key", "linkName"); URL newURL = client.use(mock(ApplicationId.class), "key");
assertEquals("The paths are not equal!", usePath, newPath); assertEquals("The paths are not equal!", useUrl, newURL);
}
@Test
public void testUseWithSameResourceName() throws Exception {
Path file = new Path("viewfs://test/path");
URI useUri = new URI("viewfs://test/path");
Path usePath = new Path(useUri);
UseSharedCacheResourceResponse response =
new UseSharedCacheResourceResponsePBImpl();
response.setPath(file.toString());
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
response);
Path newPath = client.use(mock(ApplicationId.class), "key", "path");
assertEquals("The paths are not equal!", usePath, newPath);
} }
@Test(expected = YarnException.class) @Test(expected = YarnException.class)
@ -151,7 +136,7 @@ public void testUseError() throws Exception {
String message = "Mock IOExcepiton!"; String message = "Mock IOExcepiton!";
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow( when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow(
new IOException(message)); new IOException(message));
client.use(mock(ApplicationId.class), "key", null); client.use(mock(ApplicationId.class), "key");
} }
@Test @Test