diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 05bb8f6122..ec3a3311c5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -45,6 +45,9 @@ Release 2.7.0 - UNRELEASED
YARN-2186. [YARN-1492] Node Manager uploader service for cache manager.
(Chris Trezzo and Sangjin Lee via kasha)
+ YARN-2236. [YARN-1492] Shared Cache uploader service on the Node
+ Manager. (Chris Trezzo and Sanjin Lee via kasha)
+
IMPROVEMENTS
YARN-1979. TestDirectoryCollection fails when the umask is unusual.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
index f14a136d30..726d9699c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.util.Records;
@@ -48,6 +49,14 @@ public abstract class LocalResource {
public static LocalResource newInstance(URL url, LocalResourceType type,
LocalResourceVisibility visibility, long size, long timestamp,
String pattern) {
+ return newInstance(url, type, visibility, size, timestamp, pattern, false);
+ }
+
+ @Public
+ @Unstable
+ public static LocalResource newInstance(URL url, LocalResourceType type,
+ LocalResourceVisibility visibility, long size, long timestamp,
+ String pattern, boolean shouldBeUploadedToSharedCache) {
LocalResource resource = Records.newRecord(LocalResource.class);
resource.setResource(url);
resource.setType(type);
@@ -55,6 +64,7 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
resource.setSize(size);
resource.setTimestamp(timestamp);
resource.setPattern(pattern);
+ resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
return resource;
}
@@ -65,6 +75,15 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
return newInstance(url, type, visibility, size, timestamp, null);
}
+ @Public
+ @Unstable
+ public static LocalResource newInstance(URL url, LocalResourceType type,
+ LocalResourceVisibility visibility, long size, long timestamp,
+ boolean shouldBeUploadedToSharedCache) {
+ return newInstance(url, type, visibility, size, timestamp, null,
+ shouldBeUploadedToSharedCache);
+ }
+
/**
* Get the location of the resource to be localized.
* @return location of the resource to be localized
@@ -170,4 +189,23 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
@Public
@Stable
public abstract void setPattern(String pattern);
+
+ /**
+ * NM uses it to decide whether if it is necessary to upload the resource to
+ * the shared cache
+ */
+ @Public
+ @Unstable
+ public abstract boolean getShouldBeUploadedToSharedCache();
+
+ /**
+ * Inform NM whether upload to SCM is needed.
+ *
+ * @param shouldBeUploadedToSharedCache shouldBeUploadedToSharedCache
+ * of this request
+ */
+ @Public
+ @Unstable
+ public abstract void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9b57a4243c..4b4f58126c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1472,6 +1472,25 @@ private static void addDeprecatedKeys() {
SHARED_CACHE_PREFIX + "uploader.server.thread-count";
public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
+ /** the checksum algorithm implementation **/
+ public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+ SHARED_CACHE_PREFIX + "checksum.algo.impl";
+ public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+ "org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl";
+
+ // node manager (uploader) configs
+ /**
+ * The replication factor for the node manager uploader for the shared cache.
+ */
+ public static final String SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
+ SHARED_CACHE_PREFIX + "nm.uploader.replication.factor";
+ public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
+ 10;
+
+ public static final String SHARED_CACHE_NM_UPLOADER_THREAD_COUNT =
+ SHARED_CACHE_PREFIX + "nm.uploader.thread-count";
+ public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20;
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 5c86c2dfca..c4e756d81c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -159,6 +159,7 @@ message LocalResourceProto {
optional LocalResourceTypeProto type = 4;
optional LocalResourceVisibilityProto visibility = 5;
optional string pattern = 6;
+ optional bool should_be_uploaded_to_shared_cache = 7;
}
message ApplicationResourceUsageReportProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
index 16bd59740d..560b081c01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
@@ -192,6 +192,26 @@ public synchronized void setPattern(String pattern) {
builder.setPattern(pattern);
}
+ @Override
+ public synchronized boolean getShouldBeUploadedToSharedCache() {
+ LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasShouldBeUploadedToSharedCache()) {
+ return false;
+ }
+ return p.getShouldBeUploadedToSharedCache();
+ }
+
+ @Override
+ public synchronized void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache) {
+ maybeInitBuilder();
+ if (!shouldBeUploadedToSharedCache) {
+ builder.clearShouldBeUploadedToSharedCache();
+ return;
+ }
+ builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
+ }
+
private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
return ProtoUtils.convertToProtoFormat(e);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
new file mode 100644
index 0000000000..24ceeae248
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Private
+@Evolving
+/**
+ * The SHA-256 implementation of the shared cache checksum interface.
+ */
+public class ChecksumSHA256Impl implements SharedCacheChecksum {
+ public String computeChecksum(InputStream in) throws IOException {
+ return DigestUtils.sha256Hex(in);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java
new file mode 100644
index 0000000000..7e6fddaa26
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+/**
+ * An interface to calculate a checksum for a resource in the shared cache. The
+ * checksum implementation should be thread safe.
+ */
+public interface SharedCacheChecksum {
+
+ /**
+ * Calculate the checksum of the passed input stream.
+ *
+ * @param in InputStream
to be checksumed
+ * @return the message digest of the input stream
+ * @throws IOException
+ */
+ public String computeChecksum(InputStream in) throws IOException;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java
new file mode 100644
index 0000000000..cbfd95db5b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+@SuppressWarnings("unchecked")
+@Public
+@Evolving
+/**
+ * A factory class for creating checksum objects based on a configurable
+ * algorithm implementation
+ */
+public class SharedCacheChecksumFactory {
+ private static final
+ ConcurrentMap,SharedCacheChecksum>
+ instances =
+ new ConcurrentHashMap,
+ SharedCacheChecksum>();
+
+ private static final Class extends SharedCacheChecksum> defaultAlgorithm;
+
+ static {
+ try {
+ defaultAlgorithm = (Class extends SharedCacheChecksum>)
+ Class.forName(
+ YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+ } catch (Exception e) {
+ // cannot happen
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ /**
+ * Get a new SharedCacheChecksum
object based on the configurable
+ * algorithm implementation
+ * (see yarn.sharedcache.checksum.algo.impl
)
+ *
+ * @return SharedCacheChecksum
object
+ */
+ public static SharedCacheChecksum getChecksum(Configuration conf) {
+ Class extends SharedCacheChecksum> clazz =
+ conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
+ defaultAlgorithm, SharedCacheChecksum.class);
+ SharedCacheChecksum checksum = instances.get(clazz);
+ if (checksum == null) {
+ try {
+ checksum = ReflectionUtils.newInstance(clazz, conf);
+ SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum);
+ if (old != null) {
+ checksum = old;
+ }
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ return checksum;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
index 2737cce196..436cb31293 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
@@ -32,6 +32,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
@@ -134,8 +135,8 @@ public Future load(Path path) {
* @return true if the path in the current path is visible to all, false
* otherwise
*/
- @VisibleForTesting
- static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
+ @Private
+ public static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
LoadingCache> statCache) throws IOException {
current = fs.makeQualified(current);
//the leaf level file should be readable by others
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index af4a5eb02e..af3b5aa0b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1458,6 +1458,24 @@
50
+
+ The algorithm used to compute checksums of files (SHA-256 by default)
+ yarn.sharedcache.checksum.algo.impl
+ org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl
+
+
+
+ The replication factor for the node manager uploader for the shared cache (10 by default)
+ yarn.sharedcache.nm.uploader.replication.factor
+ 10
+
+
+
+ The number of threads used to upload files from a node manager instance (20 by default)
+ yarn.sharedcache.nm.uploader.thread-count
+ 20
+
+
The interval that the yarn client library uses to poll the
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index a7e5d9cd82..1b326716af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -70,6 +70,7 @@
* Builder utilities to construct various objects.
*
*/
+@Private
public class BuilderUtils {
private static final RecordFactory recordFactory = RecordFactoryProvider
@@ -94,7 +95,8 @@ public int compare(ContainerId c1,
}
public static LocalResource newLocalResource(URL url, LocalResourceType type,
- LocalResourceVisibility visibility, long size, long timestamp) {
+ LocalResourceVisibility visibility, long size, long timestamp,
+ boolean shouldBeUploadedToSharedCache) {
LocalResource resource =
recordFactory.newRecordInstance(LocalResource.class);
resource.setResource(url);
@@ -102,14 +104,15 @@ public static LocalResource newLocalResource(URL url, LocalResourceType type,
resource.setVisibility(visibility);
resource.setSize(size);
resource.setTimestamp(timestamp);
+ resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
return resource;
}
public static LocalResource newLocalResource(URI uri,
LocalResourceType type, LocalResourceVisibility visibility, long size,
- long timestamp) {
+ long timestamp, boolean shouldBeUploadedToSharedCache) {
return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
- visibility, size, timestamp);
+ visibility, size, timestamp, shouldBeUploadedToSharedCache);
}
public static ApplicationId newApplicationId(RecordFactory recordFactory,
@@ -245,7 +248,6 @@ public static Token newAMRMToken(byte[] identifier, String kind,
return newToken(Token.class, identifier, kind, password, service);
}
- @Private
@VisibleForTesting
public static Token newContainerToken(NodeId nodeId,
byte[] password, ContainerTokenIdentifier tokenIdentifier) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 35b232fea3..bb277d94b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -119,6 +119,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
@@ -227,6 +229,13 @@ public void serviceInit(Configuration conf) throws Exception {
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
+ // add the shared cache upload service (it will do nothing if the shared
+ // cache is disabled)
+ SharedCacheUploadService sharedCacheUploader =
+ createSharedCacheUploaderService();
+ addService(sharedCacheUploader);
+ dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
+
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -367,6 +376,10 @@ protected ResourceLocalizationService createResourceLocalizationService(
deletionContext, dirsHandler, context);
}
+ protected SharedCacheUploadService createSharedCacheUploaderService() {
+ return new SharedCacheUploadService();
+ }
+
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index fa54ee19b9..6b65a544bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -59,6 +60,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
@@ -104,6 +107,10 @@ public class ContainerImpl implements Container {
new ArrayList();
private final List appRsrcs =
new ArrayList();
+ private final Map resourcesToBeUploaded =
+ new ConcurrentHashMap();
+ private final Map resourcesUploadPolicies =
+ new ConcurrentHashMap();
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
@@ -637,6 +644,8 @@ public ContainerState transition(ContainerImpl container,
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
+ storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
+ .getShouldBeUploadedToSharedCache());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
@@ -685,31 +694,77 @@ public ContainerState transition(ContainerImpl container,
}
}
+ /**
+ * Store the resource's shared cache upload policies
+ * Given LocalResourceRequest can be shared across containers in
+ * LocalResourcesTrackerImpl, we preserve the upload policies here.
+ * In addition, it is possible for the application to create several
+ * "identical" LocalResources as part of
+ * ContainerLaunchContext.setLocalResources with different symlinks.
+ * There is a corner case where these "identical" local resources have
+ * different upload policies. For that scenario, upload policy will be set to
+ * true as long as there is at least one LocalResource entry with
+ * upload policy set to true.
+ */
+ private static void storeSharedCacheUploadPolicy(ContainerImpl container,
+ LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
+ Boolean storedUploadPolicy =
+ container.resourcesUploadPolicies.get(resourceRequest);
+ if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
+ container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
+ }
+ }
+
/**
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
static class LocalizedTransition implements
MultipleArcTransition {
+ @SuppressWarnings("unchecked")
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
- List syms =
- container.pendingResources.remove(rsrcEvent.getResource());
+ LocalResourceRequest resourceRequest = rsrcEvent.getResource();
+ Path location = rsrcEvent.getLocation();
+ List syms = container.pendingResources.remove(resourceRequest);
if (null == syms) {
- LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
+ LOG.warn("Localized unknown resource " + resourceRequest +
" for container " + container.containerId);
assert false;
// fail container?
return ContainerState.LOCALIZING;
}
- container.localizedResources.put(rsrcEvent.getLocation(), syms);
+ container.localizedResources.put(location, syms);
+
+ // check to see if this resource should be uploaded to the shared cache
+ // as well
+ if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
+ container.resourcesToBeUploaded.put(resourceRequest, location);
+ }
if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING;
}
container.sendLaunchEvent();
+
+ // If this is a recovered container that has already launched, skip
+ // uploading resources to the shared cache. We do this to avoid uploading
+ // the same resources multiple times. The tradeoff is that in the case of
+ // a recovered container, there is a chance that resources don't get
+ // uploaded into the shared cache. This is OK because resources are not
+ // acknowledged by the SCM until they have been uploaded by the node
+ // manager.
+ if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED
+ && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
+ // kick off uploads to the shared cache
+ container.dispatcher.getEventHandler().handle(
+ new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
+ .getLaunchContext(), container.getUser(),
+ SharedCacheUploadEventType.UPLOAD));
+ }
+
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
@@ -1018,4 +1073,13 @@ public String toString() {
private boolean hasDefaultExitCode() {
return (this.exitCode == ContainerExitStatus.INVALID);
}
+
+ /**
+ * Returns whether the specific resource should be uploaded to the shared
+ * cache.
+ */
+ private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
+ LocalResourceRequest resource) {
+ return container.resourcesUploadPolicies.get(resource);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
index 70bead7320..607d0b4086 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
@@ -151,6 +151,17 @@ public String getPattern() {
return pattern;
}
+ @Override
+ public boolean getShouldBeUploadedToSharedCache() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void setResource(URL resource) {
throw new UnsupportedOperationException();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java
new file mode 100644
index 0000000000..2be080e846
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+@Private
+@Unstable
+public class SharedCacheUploadEvent extends
+ AbstractEvent {
+ private final Map resources;
+ private final ContainerLaunchContext context;
+ private final String user;
+
+ public SharedCacheUploadEvent(Map resources,
+ ContainerLaunchContext context, String user,
+ SharedCacheUploadEventType eventType) {
+ super(eventType);
+ this.resources = resources;
+ this.context = context;
+ this.user = user;
+ }
+
+ public Map getResources() {
+ return resources;
+ }
+
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return context;
+ }
+
+ public String getUser() {
+ return user;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java
new file mode 100644
index 0000000000..5ba7e1b6b1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+@Private
+@Unstable
+public enum SharedCacheUploadEventType {
+ UPLOAD
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
new file mode 100644
index 0000000000..cb11f99c55
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
@@ -0,0 +1,126 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Private
+@Unstable
+/**
+ * Service that uploads localized files to the shared cache. The upload is
+ * considered not critical, and is done on a best-effort basis. Failure to
+ * upload is not fatal.
+ */
+public class SharedCacheUploadService extends AbstractService implements
+ EventHandler {
+ private static final Log LOG =
+ LogFactory.getLog(SharedCacheUploadService.class);
+
+ private boolean enabled;
+ private FileSystem fs;
+ private FileSystem localFs;
+ private ExecutorService uploaderPool;
+ private SCMUploaderProtocol scmClient;
+
+ public SharedCacheUploadService() {
+ super(SharedCacheUploadService.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ enabled = conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED);
+ if (enabled) {
+ int threadCount =
+ conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
+ uploaderPool = Executors.newFixedThreadPool(threadCount,
+ new ThreadFactoryBuilder().
+ setNameFormat("Shared cache uploader #%d").
+ build());
+ scmClient = createSCMClient(conf);
+ try {
+ fs = FileSystem.get(conf);
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ LOG.error("Unexpected exception in getting the filesystem", e);
+ throw new RuntimeException(e);
+ }
+ }
+ super.serviceInit(conf);
+ }
+
+ private SCMUploaderProtocol createSCMClient(Configuration conf) {
+ YarnRPC rpc = YarnRPC.create(conf);
+ InetSocketAddress scmAddress =
+ conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+ return (SCMUploaderProtocol)rpc.getProxy(
+ SCMUploaderProtocol.class, scmAddress, conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (enabled) {
+ uploaderPool.shutdown();
+ RPC.stopProxy(scmClient);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public void handle(SharedCacheUploadEvent event) {
+ if (enabled) {
+ Map resources = event.getResources();
+ for (Map.Entry e: resources.entrySet()) {
+ SharedCacheUploader uploader =
+ new SharedCacheUploader(e.getKey(), e.getValue(), event.getUser(),
+ getConfig(), scmClient, fs, localFs);
+ // fire off an upload task
+ uploaderPool.submit(uploader);
+ }
+ }
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java
new file mode 100644
index 0000000000..050d5315b4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java
@@ -0,0 +1,289 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.URISyntaxException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The callable class that handles the actual upload to the shared cache.
+ */
+class SharedCacheUploader implements Callable {
+ // rwxr-xr-x
+ static final FsPermission DIRECTORY_PERMISSION =
+ new FsPermission((short)00755);
+ // r-xr-xr-x
+ static final FsPermission FILE_PERMISSION =
+ new FsPermission((short)00555);
+
+ private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class);
+ private static final ThreadLocal randomTl =
+ new ThreadLocal() {
+ @Override
+ protected Random initialValue() {
+ return new Random(System.nanoTime());
+ }
+ };
+
+ private final LocalResource resource;
+ private final Path localPath;
+ private final String user;
+ private final Configuration conf;
+ private final SCMUploaderProtocol scmClient;
+ private final FileSystem fs;
+ private final FileSystem localFs;
+ private final String sharedCacheRootDir;
+ private final int nestedLevel;
+ private final SharedCacheChecksum checksum;
+ private final RecordFactory recordFactory;
+
+ public SharedCacheUploader(LocalResource resource, Path localPath,
+ String user, Configuration conf, SCMUploaderProtocol scmClient)
+ throws IOException {
+ this(resource, localPath, user, conf, scmClient,
+ FileSystem.get(conf), localPath.getFileSystem(conf));
+ }
+
+ /**
+ * @param resource the local resource that contains the original remote path
+ * @param localPath the path in the local filesystem where the resource is
+ * localized
+ * @param fs the filesystem of the shared cache
+ * @param localFs the local filesystem
+ */
+ public SharedCacheUploader(LocalResource resource, Path localPath,
+ String user, Configuration conf, SCMUploaderProtocol scmClient,
+ FileSystem fs, FileSystem localFs) {
+ this.resource = resource;
+ this.localPath = localPath;
+ this.user = user;
+ this.conf = conf;
+ this.scmClient = scmClient;
+ this.fs = fs;
+ this.sharedCacheRootDir =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+ this.nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+ this.checksum = SharedCacheChecksumFactory.getChecksum(conf);
+ this.localFs = localFs;
+ this.recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ }
+
+ /**
+ * Uploads the file under the shared cache, and notifies the shared cache
+ * manager. If it is unable to upload the file because it already exists, it
+ * returns false.
+ */
+ @Override
+ public Boolean call() throws Exception {
+ Path tempPath = null;
+ try {
+ if (!verifyAccess()) {
+ LOG.warn("User " + user + " is not authorized to upload file " +
+ localPath.getName());
+ return false;
+ }
+
+ // first determine the actual local path that will be used for upload
+ Path actualPath = getActualPath();
+ // compute the checksum
+ String checksumVal = computeChecksum(actualPath);
+ // create the directory (if it doesn't exist)
+ Path directoryPath =
+ new Path(SharedCacheUtil.getCacheEntryPath(nestedLevel,
+ sharedCacheRootDir, checksumVal));
+ // let's not check if the directory already exists: in the vast majority
+ // of the cases, the directory does not exist; as long as mkdirs does not
+ // error out if it exists, we should be fine
+ fs.mkdirs(directoryPath, DIRECTORY_PERMISSION);
+ // create the temporary file
+ tempPath = new Path(directoryPath, getTemporaryFileName(actualPath));
+ if (!uploadFile(actualPath, tempPath)) {
+ LOG.warn("Could not copy the file to the shared cache at " + tempPath);
+ return false;
+ }
+
+ // set the permission so that it is readable but not writable
+ fs.setPermission(tempPath, FILE_PERMISSION);
+ // rename it to the final filename
+ Path finalPath = new Path(directoryPath, actualPath.getName());
+ if (!fs.rename(tempPath, finalPath)) {
+ LOG.warn("The file already exists under " + finalPath +
+ ". Ignoring this attempt.");
+ deleteTempFile(tempPath);
+ return false;
+ }
+
+ // notify the SCM
+ if (!notifySharedCacheManager(checksumVal, actualPath.getName())) {
+ // the shared cache manager rejected the upload (as it is likely
+ // uploaded under a different name
+ // clean up this file and exit
+ fs.delete(finalPath, false);
+ return false;
+ }
+
+ // set the replication factor
+ short replication =
+ (short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR);
+ fs.setReplication(finalPath, replication);
+ LOG.info("File " + actualPath.getName() +
+ " was uploaded to the shared cache at " + finalPath);
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Exception while uploading the file " + localPath.getName(), e);
+ // in case an exception is thrown, delete the temp file
+ deleteTempFile(tempPath);
+ throw e;
+ }
+ }
+
+ @VisibleForTesting
+ Path getActualPath() throws IOException {
+ Path path = localPath;
+ FileStatus status = localFs.getFileStatus(path);
+ if (status != null && status.isDirectory()) {
+ // for certain types of resources that get unpacked, the original file may
+ // be found under the directory with the same name (see
+ // FSDownload.unpack); check if the path is a directory and if so look
+ // under it
+ path = new Path(path, path.getName());
+ }
+ return path;
+ }
+
+ private void deleteTempFile(Path tempPath) {
+ try {
+ if (tempPath != null && fs.exists(tempPath)) {
+ fs.delete(tempPath, false);
+ }
+ } catch (IOException ignore) {}
+ }
+
+ /**
+ * Checks that the (original) remote file is either owned by the user who
+ * started the app or public.
+ */
+ @VisibleForTesting
+ boolean verifyAccess() throws IOException {
+ // if it is in the public cache, it's trivially OK
+ if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
+ return true;
+ }
+
+ final Path remotePath;
+ try {
+ remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
+ } catch (URISyntaxException e) {
+ throw new IOException("Invalid resource", e);
+ }
+
+ // get the file status of the HDFS file
+ FileSystem remoteFs = remotePath.getFileSystem(conf);
+ FileStatus status = remoteFs.getFileStatus(remotePath);
+ // check to see if the file has been modified in any way
+ if (status.getModificationTime() != resource.getTimestamp()) {
+ LOG.warn("The remote file " + remotePath +
+ " has changed since it's localized; will not consider it for upload");
+ return false;
+ }
+
+ // check for the user ownership
+ if (status.getOwner().equals(user)) {
+ return true; // the user owns the file
+ }
+ // check if the file is publicly readable otherwise
+ return fileIsPublic(remotePath, remoteFs, status);
+ }
+
+ @VisibleForTesting
+ boolean fileIsPublic(final Path remotePath, FileSystem remoteFs,
+ FileStatus status) throws IOException {
+ return FSDownload.isPublic(remoteFs, remotePath, status, null);
+ }
+
+ /**
+ * Uploads the file to the shared cache under a temporary name, and returns
+ * the result.
+ */
+ @VisibleForTesting
+ boolean uploadFile(Path sourcePath, Path tempPath) throws IOException {
+ return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf);
+ }
+
+ @VisibleForTesting
+ String computeChecksum(Path path) throws IOException {
+ InputStream is = localFs.open(path);
+ try {
+ return checksum.computeChecksum(is);
+ } finally {
+ try { is.close(); } catch (IOException ignore) {}
+ }
+ }
+
+ private String getTemporaryFileName(Path path) {
+ return path.getName() + "-" + randomTl.get().nextLong();
+ }
+
+ @VisibleForTesting
+ boolean notifySharedCacheManager(String checksumVal, String fileName)
+ throws IOException {
+ try {
+ SCMUploaderNotifyRequest request =
+ recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+ request.setResourceKey(checksumVal);
+ request.setFilename(fileName);
+ return scmClient.notify(request).getAccepted();
+ } catch (YarnException e) {
+ throw new IOException(e);
+ } catch (UndeclaredThrowableException e) {
+ // retrieve the cause of the exception and throw it as an IOException
+ throw new IOException(e.getCause() == null ? e : e.getCause());
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 8f7fa78299..c28d691a99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -642,7 +642,7 @@ private static Entry getMockRsrc(Random r,
URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
- r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+ r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
return new SimpleEntry(name, rsrc);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index bf36651e94..1051e7acfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -1760,7 +1760,7 @@ private static LocalResource getMockedResource(Random r,
URL url = getPath("/local/PRIVATE/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
- r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+ r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
return rsrc;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java
new file mode 100644
index 0000000000..1b2b2f0715
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import static org.junit.Assert.assertSame;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+public class TestSharedCacheUploadService {
+
+ @Test
+ public void testInitDisabled() {
+ testInit(false);
+ }
+
+ @Test
+ public void testInitEnabled() {
+ testInit(true);
+ }
+
+ public void testInit(boolean enabled) {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, enabled);
+
+ SharedCacheUploadService service = new SharedCacheUploadService();
+ service.init(conf);
+ assertSame(enabled, service.isEnabled());
+
+ service.stop();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java
new file mode 100644
index 0000000000..9234c62fd5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java
@@ -0,0 +1,241 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.junit.Test;
+
+public class TestSharedCacheUploader {
+
+ /**
+ * If verifyAccess fails, the upload should fail
+ */
+ @Test
+ public void testFailVerifyAccess() throws Exception {
+ SharedCacheUploader spied = createSpiedUploader();
+ doReturn(false).when(spied).verifyAccess();
+
+ assertFalse(spied.call());
+ }
+
+ /**
+ * If rename fails, the upload should fail
+ */
+ @Test
+ public void testRenameFail() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ Path localPath = mock(Path.class);
+ when(localPath.getName()).thenReturn("foo.jar");
+ String user = "joe";
+ SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+ SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
+ when(response.getAccepted()).thenReturn(true);
+ when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
+ thenReturn(response);
+ FileSystem fs = mock(FileSystem.class);
+ // return false when rename is called
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(false);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ SharedCacheUploader spied =
+ createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+ // stub verifyAccess() to return true
+ doReturn(true).when(spied).verifyAccess();
+ // stub getActualPath()
+ doReturn(localPath).when(spied).getActualPath();
+ // stub computeChecksum()
+ doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+ // stub uploadFile() to return true
+ doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+
+ assertFalse(spied.call());
+ }
+
+ /**
+ * If verifyAccess, uploadFile, rename, and notification succeed, the upload
+ * should succeed
+ */
+ @Test
+ public void testSuccess() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ Path localPath = mock(Path.class);
+ when(localPath.getName()).thenReturn("foo.jar");
+ String user = "joe";
+ SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+ SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
+ when(response.getAccepted()).thenReturn(true);
+ when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
+ thenReturn(response);
+ FileSystem fs = mock(FileSystem.class);
+ // return false when rename is called
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ SharedCacheUploader spied =
+ createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+ // stub verifyAccess() to return true
+ doReturn(true).when(spied).verifyAccess();
+ // stub getActualPath()
+ doReturn(localPath).when(spied).getActualPath();
+ // stub computeChecksum()
+ doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+ // stub uploadFile() to return true
+ doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+ // stub notifySharedCacheManager to return true
+ doReturn(true).when(spied).notifySharedCacheManager(isA(String.class),
+ isA(String.class));
+
+ assertTrue(spied.call());
+ }
+
+ /**
+ * If verifyAccess, uploadFile, and rename succed, but it receives a nay from
+ * SCM, the file should be deleted
+ */
+ @Test
+ public void testNotifySCMFail() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ Path localPath = mock(Path.class);
+ when(localPath.getName()).thenReturn("foo.jar");
+ String user = "joe";
+ FileSystem fs = mock(FileSystem.class);
+ // return false when rename is called
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ SharedCacheUploader spied =
+ createSpiedUploader(resource, localPath, user, conf, null, fs,
+ localFs);
+ // stub verifyAccess() to return true
+ doReturn(true).when(spied).verifyAccess();
+ // stub getActualPath()
+ doReturn(localPath).when(spied).getActualPath();
+ // stub computeChecksum()
+ doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+ // stub uploadFile() to return true
+ doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+ // stub notifySharedCacheManager to return true
+ doReturn(false).when(spied).notifySharedCacheManager(isA(String.class),
+ isA(String.class));
+
+ assertFalse(spied.call());
+ verify(fs).delete(isA(Path.class), anyBoolean());
+ }
+
+ /**
+ * If resource is public, verifyAccess should succeed
+ */
+ @Test
+ public void testVerifyAccessPublicResource() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ // give public visibility
+ when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
+ Path localPath = mock(Path.class);
+ when(localPath.getName()).thenReturn("foo.jar");
+ String user = "joe";
+ SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+ FileSystem fs = mock(FileSystem.class);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ SharedCacheUploader spied =
+ createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+
+ assertTrue(spied.verifyAccess());
+ }
+
+ /**
+ * If the localPath does not exists, getActualPath should get to one level
+ * down
+ */
+ @Test
+ public void testGetActualPath() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ // give public visibility
+ when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
+ Path localPath = new Path("foo.jar");
+ String user = "joe";
+ SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+ FileSystem fs = mock(FileSystem.class);
+ FileSystem localFs = mock(FileSystem.class);
+ // stub it to return a status that indicates a directory
+ FileStatus status = mock(FileStatus.class);
+ when(status.isDirectory()).thenReturn(true);
+ when(localFs.getFileStatus(localPath)).thenReturn(status);
+ SharedCacheUploader spied =
+ createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+
+ Path actualPath = spied.getActualPath();
+ assertEquals(actualPath.getName(), localPath.getName());
+ assertEquals(actualPath.getParent().getName(), localPath.getName());
+ }
+
+ private SharedCacheUploader createSpiedUploader() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ Path localPath = mock(Path.class);
+ String user = "foo";
+ SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+ FileSystem fs = FileSystem.get(conf);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ return createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+ }
+
+ private SharedCacheUploader createSpiedUploader(LocalResource resource, Path localPath,
+ String user, Configuration conf, SCMUploaderProtocol scmClient,
+ FileSystem fs, FileSystem localFs)
+ throws IOException {
+ SharedCacheUploader uploader = new SharedCacheUploader(resource, localPath, user, conf, scmClient,
+ fs, localFs);
+ return spy(uploader);
+ }
+}