YARN-3637. Handle localization sym-linking correctly at the YARN level. Contributed by Chris Trezzo.
This commit is contained in:
parent
cd59b9ccab
commit
425a7e5028
@ -55,22 +55,37 @@ public SharedCacheClient(String name) {
|
||||
* {@link ApplicationId} to identify which application will be using the
|
||||
* resource.
|
||||
* </p>
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* The <code>SharedCacheManager</code> responds with whether or not the
|
||||
* resource exists in the cache. If the resource exists, a <code>Path</code>
|
||||
* to the resource in the shared cache is returned. If the resource does not
|
||||
* exist, null is returned instead.
|
||||
* </p>
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* Once a path has been returned for a resource, that path is safe to use for
|
||||
* the lifetime of the application that corresponds to the provided
|
||||
* ApplicationId.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Additionally, a name for the resource should be specified. A fragment will
|
||||
* be added to the path with the desired name if the desired name is different
|
||||
* than the name of the provided path from the shared cache. This ensures that
|
||||
* if the returned path is used to create a LocalResource, then the symlink
|
||||
* created during YARN localization will match the name specified.
|
||||
* </p>
|
||||
*
|
||||
* @param applicationId ApplicationId of the application using the resource
|
||||
* @param resourceKey the key (i.e. checksum) that identifies the resource
|
||||
* @param resourceName the desired name of the resource
|
||||
* @return Path to the resource, or null if it does not exist
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Path use(ApplicationId applicationId, String resourceKey)
|
||||
throws YarnException;
|
||||
public abstract Path use(ApplicationId applicationId, String resourceKey,
|
||||
String resourceName) throws YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -21,6 +21,8 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -111,8 +113,8 @@ protected void stopClientProxy() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path use(ApplicationId applicationId, String resourceKey)
|
||||
throws YarnException {
|
||||
public Path use(ApplicationId applicationId, String resourceKey,
|
||||
String resourceName) throws YarnException {
|
||||
Path resourcePath = null;
|
||||
UseSharedCacheResourceRequest request = Records.newRecord(
|
||||
UseSharedCacheResourceRequest.class);
|
||||
@ -129,6 +131,31 @@ public Path use(ApplicationId applicationId, String resourceKey)
|
||||
// We don't handle different exceptions separately at this point.
|
||||
throw new YarnException(e);
|
||||
}
|
||||
if (resourcePath != null) {
|
||||
if (resourcePath.getName().equals(resourceName)) {
|
||||
// The preferred name is the same as the name of the item in the cache,
|
||||
// so we skip generating the fragment to save space in the MRconfig.
|
||||
return resourcePath;
|
||||
} else {
|
||||
// We are using the shared cache, and a preferred name has been
|
||||
// specified that is different than the name of the resource in the
|
||||
// shared cache. We need to set the fragment portion of the URI to
|
||||
// preserve the desired name.
|
||||
URI pathURI = resourcePath.toUri();
|
||||
try {
|
||||
// We assume that there is no existing fragment in the URI since the
|
||||
// shared cache manager does not use fragments.
|
||||
pathURI =
|
||||
new URI(pathURI.getScheme(), pathURI.getSchemeSpecificPart(),
|
||||
resourceName);
|
||||
resourcePath = new Path(pathURI);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new YarnException(
|
||||
"Could not create a new URI due to syntax errors: "
|
||||
+ pathURI.toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return resourcePath;
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -26,6 +27,7 @@
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -106,15 +108,42 @@ public void cleanup() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUse() throws Exception {
|
||||
public void testUseCacheMiss() throws Exception {
|
||||
UseSharedCacheResourceResponse response =
|
||||
new UseSharedCacheResourceResponsePBImpl();
|
||||
response.setPath(null);
|
||||
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
|
||||
response);
|
||||
Path newPath = client.use(mock(ApplicationId.class), "key", null);
|
||||
assertNull("The path is not null!", newPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUseWithResourceName() throws Exception {
|
||||
Path file = new Path("viewfs://test/path");
|
||||
URI useUri = new URI("viewfs://test/path#linkName");
|
||||
Path usePath = new Path(useUri);
|
||||
UseSharedCacheResourceResponse response =
|
||||
new UseSharedCacheResourceResponsePBImpl();
|
||||
response.setPath(file.toString());
|
||||
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
|
||||
response);
|
||||
Path newPath = client.use(mock(ApplicationId.class), "key");
|
||||
assertEquals(file, newPath);
|
||||
Path newPath = client.use(mock(ApplicationId.class), "key", "linkName");
|
||||
assertEquals("The paths are not equal!", usePath, newPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUseWithSameResourceName() throws Exception {
|
||||
Path file = new Path("viewfs://test/path");
|
||||
URI useUri = new URI("viewfs://test/path");
|
||||
Path usePath = new Path(useUri);
|
||||
UseSharedCacheResourceResponse response =
|
||||
new UseSharedCacheResourceResponsePBImpl();
|
||||
response.setPath(file.toString());
|
||||
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
|
||||
response);
|
||||
Path newPath = client.use(mock(ApplicationId.class), "key", "path");
|
||||
assertEquals("The paths are not equal!", usePath, newPath);
|
||||
}
|
||||
|
||||
@Test(expected = YarnException.class)
|
||||
@ -122,7 +151,7 @@ public void testUseError() throws Exception {
|
||||
String message = "Mock IOExcepiton!";
|
||||
when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow(
|
||||
new IOException(message));
|
||||
client.use(mock(ApplicationId.class), "key");
|
||||
client.use(mock(ApplicationId.class), "key", null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user