params,
}
}
}
-
+
/**
* Check if reaching to maximum of retries.
* @param retries
* @param e
*/
- private void checkRetryWithSleep(int retries, Exception e) throws
+ private void checkRetryWithSleep(int retries, Exception e) throws
YarnException, IOException {
if (retries > 0) {
try {
@@ -452,8 +451,8 @@ private void checkRetryWithSleep(int retries, Exception e) throws
}
} else {
LOG.error(
- "TimelineClient has reached to max retry times :" +
- this.maxServiceRetries + " for service address: " +
+ "TimelineClient has reached to max retry times :" +
+ this.maxServiceRetries + " for service address: " +
timelineServiceAddress);
if (e instanceof YarnException) {
throw (YarnException)e;
@@ -499,7 +498,7 @@ private void putObjects(
public void setTimelineServiceAddress(String address) {
this.timelineServiceAddress = address;
}
-
+
private String getTimelineServiceAddress() {
return this.timelineServiceAddress;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a477f18554..2781da0c95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -965,10 +965,10 @@
yarn.nodemanager.container-manager.thread-count
20
-
+
- Number of threads aggregator service uses.
- yarn.nodemanager.aggregator-service.thread-count
+ Number of threads collector service uses.
+ yarn.nodemanager.collector-service.thread-count
5
@@ -1046,11 +1046,11 @@
yarn.nodemanager.localizer.address
${yarn.nodemanager.hostname}:8040
-
-
+
+
- Address where the aggregator service IPC is.
- yarn.nodemanager.aggregator-service.address
+ Address where the collector service IPC is.
+ yarn.nodemanager.collector-service.address
${yarn.nodemanager.hostname}:8048
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
index 18719e6e79..31bea2d81d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
@@ -135,7 +135,7 @@ private void testRPCTimeout(String rpcClass) throws Exception {
Assert.fail("timeout exception should have occurred!");
}
-
+
public static Token newContainerToken(NodeId nodeId, byte[] password,
ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index b8ef142c6f..95bb89b04f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -150,7 +150,7 @@
yarn_server_common_service_protos.proto
ResourceTracker.proto
SCMUploader.proto
- aggregatornodemanager_protocol.proto
+ collectornodemanager_protocol.proto
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
similarity index 61%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
index 53bdb4e8db..26c121a71b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
@@ -21,36 +21,37 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
/**
- * The protocol between an TimelineAggregatorsCollection
and a
- * NodeManager
to report a new application aggregator get launched.
+ *
The protocol between an TimelineCollectorManager
and a
+ * NodeManager
to report a new application collector get launched.
*
- *
+ *
*/
@Private
-public interface AggregatorNodemanagerProtocol {
+public interface CollectorNodemanagerProtocol {
/**
- *
+ *
*
- * The TimelineAggregatorsCollection
provides a list of mapping
- * between application and aggregator's address in
- * {@link ReportNewAggregatorsInfoRequest} to a NodeManager
to
- * register aggregator's info, include: applicationId and REST URI to
- * access aggregator. NodeManager will add them into registered aggregators
+ * The TimelineCollectorManager
provides a list of mapping
+ * between application and collector's address in
+ * {@link ReportNewCollectorInfoRequest} to a NodeManager
to
+ * register collector's info, include: applicationId and REST URI to
+ * access collector. NodeManager will add them into registered collectors
* and register them into ResourceManager
afterwards.
*
- *
- * @param request the request of registering a new aggregator or a list of aggregators
- * @return
+ *
+ * @param request the request of registering a new collector or a list of
+ * collectors
+ * @return
* @throws YarnException
* @throws IOException
*/
- ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
- ReportNewAggregatorsInfoRequest request)
+ ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request)
throws YarnException, IOException;
-
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
similarity index 77%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
index 4df80a5a2d..655e9890a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
@@ -20,14 +20,14 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.yarn.proto.AggregatorNodemanagerProtocol.AggregatorNodemanagerProtocolService;
+import org.apache.hadoop.yarn.proto.CollectorNodemanagerProtocol.CollectorNodemanagerProtocolService;
@Private
@Unstable
@ProtocolInfo(
- protocolName = "org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB",
+ protocolName = "org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB",
protocolVersion = 1)
-public interface AggregatorNodemanagerProtocolPB extends
- AggregatorNodemanagerProtocolService.BlockingInterface {
+public interface CollectorNodemanagerProtocolPB extends
+ CollectorNodemanagerProtocolService.BlockingInterface {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
similarity index 70%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
index 6e777e7898..276a540c77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
@@ -30,18 +30,18 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
import com.google.protobuf.ServiceException;
-public class AggregatorNodemanagerProtocolPBClientImpl implements
- AggregatorNodemanagerProtocol, Closeable {
+public class CollectorNodemanagerProtocolPBClientImpl implements
+ CollectorNodemanagerProtocol, Closeable {
// Not a documented config. Only used for tests internally
static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
@@ -51,39 +51,39 @@ public class AggregatorNodemanagerProtocolPBClientImpl implements
* Maximum of 1 minute timeout for a Node to react to the command
*/
static final int DEFAULT_COMMAND_TIMEOUT = 60000;
-
- private AggregatorNodemanagerProtocolPB proxy;
-
+
+ private CollectorNodemanagerProtocolPB proxy;
+
@Private
- public AggregatorNodemanagerProtocolPBClientImpl(long clientVersion,
+ public CollectorNodemanagerProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
- RPC.setProtocolEngine(conf, AggregatorNodemanagerProtocolPB.class,
+ RPC.setProtocolEngine(conf, CollectorNodemanagerProtocolPB.class,
ProtobufRpcEngine.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
proxy =
- (AggregatorNodemanagerProtocolPB) RPC.getProxy(
- AggregatorNodemanagerProtocolPB.class,
+ (CollectorNodemanagerProtocolPB) RPC.getProxy(
+ CollectorNodemanagerProtocolPB.class,
clientVersion, addr, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), expireIntvl);
}
-
+
@Override
- public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
- ReportNewAggregatorsInfoRequest request) throws YarnException, IOException {
-
- ReportNewAggregatorsInfoRequestProto requestProto =
- ((ReportNewAggregatorsInfoRequestPBImpl) request).getProto();
+ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request) throws YarnException, IOException {
+
+ ReportNewCollectorInfoRequestProto requestProto =
+ ((ReportNewCollectorInfoRequestPBImpl) request).getProto();
try {
- return new ReportNewAggregatorsInfoResponsePBImpl(
- proxy.reportNewAggregatorInfo(null, requestProto));
+ return new ReportNewCollectorInfoResponsePBImpl(
+ proxy.reportNewCollectorInfo(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
-
+
@Override
public void close() {
if (this.proxy != null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
similarity index 57%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
index 87bce16857..3f42732bc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
@@ -19,38 +19,36 @@
import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
-public class AggregatorNodemanagerProtocolPBServiceImpl implements
- AggregatorNodemanagerProtocolPB {
+public class CollectorNodemanagerProtocolPBServiceImpl implements
+ CollectorNodemanagerProtocolPB {
- private AggregatorNodemanagerProtocol real;
-
- public AggregatorNodemanagerProtocolPBServiceImpl(AggregatorNodemanagerProtocol impl) {
+ private CollectorNodemanagerProtocol real;
+
+ public CollectorNodemanagerProtocolPBServiceImpl(CollectorNodemanagerProtocol impl) {
this.real = impl;
}
@Override
- public ReportNewAggregatorsInfoResponseProto reportNewAggregatorInfo(
- RpcController arg0, ReportNewAggregatorsInfoRequestProto proto)
+ public ReportNewCollectorInfoResponseProto reportNewCollectorInfo(
+ RpcController arg0, ReportNewCollectorInfoRequestProto proto)
throws ServiceException {
- ReportNewAggregatorsInfoRequestPBImpl request =
- new ReportNewAggregatorsInfoRequestPBImpl(proto);
+ ReportNewCollectorInfoRequestPBImpl request =
+ new ReportNewCollectorInfoRequestPBImpl(proto);
try {
- ReportNewAggregatorsInfoResponse response = real.reportNewAggregatorInfo(request);
- return ((ReportNewAggregatorsInfoResponsePBImpl)response).getProto();
+ ReportNewCollectorInfoResponse response = real.reportNewCollectorInfo(request);
+ return ((ReportNewCollectorInfoResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index 09cb9e3dd0..c795e556c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -43,11 +43,11 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
nodeHeartbeatRequest.setNodeLabels(nodeLabels);
return nodeHeartbeatRequest;
}
-
+
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey, Set nodeLabels,
- Map registeredAggregators) {
+ Map registeredCollectors) {
NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@@ -56,7 +56,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
nodeHeartbeatRequest.setNodeLabels(nodeLabels);
- nodeHeartbeatRequest.setRegisteredAggregators(registeredAggregators);
+ nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors);
return nodeHeartbeatRequest;
}
@@ -78,7 +78,8 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
public abstract void setLogAggregationReportsForApps(
List logAggregationReportsForApps);
- // This tells RM registered aggregators' address info on this node
- public abstract Map getRegisteredAggregators();
- public abstract void setRegisteredAggregators(Map appAggregatorsMap);
+ // This tells RM registered collectors' address info on this node
+ public abstract Map getRegisteredCollectors();
+ public abstract void setRegisteredCollectors(Map appCollectorsMap);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index f5d0043a6a..09cafaf066 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -40,10 +40,10 @@ public interface NodeHeartbeatResponse {
List getContainersToBeRemovedFromNM();
List getApplicationsToCleanup();
-
- // This tells NM the aggregators' address info of related Apps
- Map getAppAggregatorsMap();
- void setAppAggregatorsMap(Map appAggregatorsMap);
+
+ // This tells NM the collectors' address info of related apps
+ Map getAppCollectorsMap();
+ void setAppCollectorsMap(Map appCollectorsMap);
void setResponseId(int responseId);
void setNodeAction(NodeAction action);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
similarity index 55%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
index ae538a2edc..3498de90a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
@@ -22,32 +22,32 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
import org.apache.hadoop.yarn.util.Records;
@Private
-public abstract class ReportNewAggregatorsInfoRequest {
-
- public static ReportNewAggregatorsInfoRequest newInstance(
- List appAggregatorsList) {
- ReportNewAggregatorsInfoRequest request =
- Records.newRecord(ReportNewAggregatorsInfoRequest.class);
- request.setAppAggregatorsList(appAggregatorsList);
+public abstract class ReportNewCollectorInfoRequest {
+
+ public static ReportNewCollectorInfoRequest newInstance(
+ List appCollectorsList) {
+ ReportNewCollectorInfoRequest request =
+ Records.newRecord(ReportNewCollectorInfoRequest.class);
+ request.setAppCollectorsList(appCollectorsList);
return request;
}
-
- public static ReportNewAggregatorsInfoRequest newInstance(
- ApplicationId id, String aggregatorAddr) {
- ReportNewAggregatorsInfoRequest request =
- Records.newRecord(ReportNewAggregatorsInfoRequest.class);
- request.setAppAggregatorsList(
- Arrays.asList(AppAggregatorsMap.newInstance(id, aggregatorAddr)));
+
+ public static ReportNewCollectorInfoRequest newInstance(
+ ApplicationId id, String collectorAddr) {
+ ReportNewCollectorInfoRequest request =
+ Records.newRecord(ReportNewCollectorInfoRequest.class);
+ request.setAppCollectorsList(
+ Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr)));
return request;
}
-
- public abstract List getAppAggregatorsList();
-
- public abstract void setAppAggregatorsList(
- List appAggregatorsList);
-
+
+ public abstract List getAppCollectorsList();
+
+ public abstract void setAppCollectorsList(
+ List appCollectorsList);
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
similarity index 81%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
index 3b847d6dcb..4157c47af3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
@@ -20,12 +20,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.util.Records;
-public abstract class ReportNewAggregatorsInfoResponse {
+public abstract class ReportNewCollectorInfoResponse {
@Private
- public static ReportNewAggregatorsInfoResponse newInstance() {
- ReportNewAggregatorsInfoResponse response =
- Records.newRecord(ReportNewAggregatorsInfoResponse.class);
+ public static ReportNewCollectorInfoResponse newInstance() {
+ ReportNewCollectorInfoResponse response =
+ Records.newRecord(ReportNewCollectorInfoResponse.class);
return response;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 02e0a3da76..fa0cf5cd43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -34,8 +34,8 @@
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
@@ -58,7 +58,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private Set labels = null;
private List logAggregationReportsForApps = null;
- Map registeredAggregators = null;
+ Map registeredCollectors = null;
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
@@ -114,8 +114,8 @@ private void mergeLocalToBuilder() {
if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto();
}
- if (this.registeredAggregators != null) {
- addRegisteredAggregatorsToProto();
+ if (this.registeredCollectors != null) {
+ addRegisteredCollectorsToProto();
}
}
@@ -158,13 +158,13 @@ private LogAggregationReportProto convertToProtoFormat(
return ((LogAggregationReportPBImpl) value).getProto();
}
- private void addRegisteredAggregatorsToProto() {
+ private void addRegisteredCollectorsToProto() {
maybeInitBuilder();
- builder.clearRegisteredAggregators();
- for (Map.Entry entry : registeredAggregators.entrySet()) {
- builder.addRegisteredAggregators(AppAggregatorsMapProto.newBuilder()
+ builder.clearRegisteredCollectors();
+ for (Map.Entry entry : registeredCollectors.entrySet()) {
+ builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
- .setAppAggregatorAddr(entry.getValue()));
+ .setAppCollectorAddr(entry.getValue()));
}
}
@@ -248,35 +248,35 @@ public void setLastKnownNMTokenMasterKey(MasterKey masterKey) {
builder.clearLastKnownNmTokenMasterKey();
this.lastKnownNMTokenMasterKey = masterKey;
}
-
+
@Override
- public Map getRegisteredAggregators() {
- if (this.registeredAggregators != null) {
- return this.registeredAggregators;
+ public Map getRegisteredCollectors() {
+ if (this.registeredCollectors != null) {
+ return this.registeredCollectors;
}
- initRegisteredAggregators();
- return registeredAggregators;
+ initRegisteredCollectors();
+ return registeredCollectors;
}
-
- private void initRegisteredAggregators() {
+
+ private void initRegisteredCollectors() {
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
- List list = p.getRegisteredAggregatorsList();
- this.registeredAggregators = new HashMap ();
- for (AppAggregatorsMapProto c : list) {
+ List list = p.getRegisteredCollectorsList();
+ this.registeredCollectors = new HashMap ();
+ for (AppCollectorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
- this.registeredAggregators.put(appId, c.getAppAggregatorAddr());
+ this.registeredCollectors.put(appId, c.getAppCollectorAddr());
}
}
-
+
@Override
- public void setRegisteredAggregators(
- Map registeredAggregators) {
- if (registeredAggregators == null || registeredAggregators.isEmpty()) {
+ public void setRegisteredCollectors(
+ Map registeredCollectors) {
+ if (registeredCollectors == null || registeredCollectors.isEmpty()) {
return;
}
maybeInitBuilder();
- this.registeredAggregators = new HashMap();
- this.registeredAggregators.putAll(registeredAggregators);
+ this.registeredCollectors = new HashMap();
+ this.registeredCollectors.putAll(registeredCollectors);
}
private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
@@ -286,11 +286,11 @@ private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
private NodeStatusProto convertToProtoFormat(NodeStatus t) {
return ((NodeStatusPBImpl)t).getProto();
}
-
+
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
-
+
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index cb20615098..356414fe12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -46,7 +46,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
@@ -69,7 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends
private List applicationsToCleanup = null;
private Map systemCredentials = null;
private Resource resource = null;
- Map appAggregatorsMap = null;
+ Map appCollectorsMap = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@@ -127,8 +127,8 @@ private void mergeLocalToBuilder() {
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
- if (this.appAggregatorsMap != null) {
- addAppAggregatorsMapToProto();
+ if (this.appCollectorsMap != null) {
+ addAppCollectorsMapToProto();
}
}
@@ -142,14 +142,14 @@ private void addSystemCredentialsToProto() {
entry.getValue().duplicate())));
}
}
-
- private void addAppAggregatorsMapToProto() {
+
+ private void addAppCollectorsMapToProto() {
maybeInitBuilder();
- builder.clearAppAggregatorsMap();
- for (Map.Entry entry : appAggregatorsMap.entrySet()) {
- builder.addAppAggregatorsMap(AppAggregatorsMapProto.newBuilder()
+ builder.clearAppCollectorsMap();
+ for (Map.Entry entry : appCollectorsMap.entrySet()) {
+ builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
- .setAppAggregatorAddr(entry.getValue()));
+ .setAppCollectorAddr(entry.getValue()));
}
}
@@ -565,14 +565,14 @@ public Map getSystemCredentialsForApps() {
initSystemCredentials();
return systemCredentials;
}
-
+
@Override
- public Map getAppAggregatorsMap() {
- if (this.appAggregatorsMap != null) {
- return this.appAggregatorsMap;
+ public Map getAppCollectorsMap() {
+ if (this.appCollectorsMap != null) {
+ return this.appCollectorsMap;
}
- initAppAggregatorsMap();
- return appAggregatorsMap;
+ initAppCollectorsMap();
+ return appCollectorsMap;
}
private void initSystemCredentials() {
@@ -585,14 +585,14 @@ private void initSystemCredentials() {
this.systemCredentials.put(appId, byteBuffer);
}
}
-
- private void initAppAggregatorsMap() {
+
+ private void initAppCollectorsMap() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
- List list = p.getAppAggregatorsMapList();
- this.appAggregatorsMap = new HashMap ();
- for (AppAggregatorsMapProto c : list) {
+ List list = p.getAppCollectorsMapList();
+ this.appCollectorsMap = new HashMap ();
+ for (AppCollectorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
- this.appAggregatorsMap.put(appId, c.getAppAggregatorAddr());
+ this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
}
}
@@ -606,16 +606,16 @@ public void setSystemCredentialsForApps(
this.systemCredentials = new HashMap();
this.systemCredentials.putAll(systemCredentials);
}
-
+
@Override
- public void setAppAggregatorsMap(
- Map appAggregatorsMap) {
- if (appAggregatorsMap == null || appAggregatorsMap.isEmpty()) {
+ public void setAppCollectorsMap(
+ Map appCollectorsMap) {
+ if (appCollectorsMap == null || appCollectorsMap.isEmpty()) {
return;
}
maybeInitBuilder();
- this.appAggregatorsMap = new HashMap();
- this.appAggregatorsMap.putAll(appAggregatorsMap);
+ this.appCollectorsMap = new HashMap();
+ this.appCollectorsMap.putAll(appCollectorsMap);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
deleted file mode 100644
index eb7beef2d5..0000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProtoOrBuilder;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.AppAggregatorsMapPBImpl;
-
-public class ReportNewAggregatorsInfoRequestPBImpl extends
- ReportNewAggregatorsInfoRequest {
-
- ReportNewAggregatorsInfoRequestProto proto =
- ReportNewAggregatorsInfoRequestProto.getDefaultInstance();
-
- ReportNewAggregatorsInfoRequestProto.Builder builder = null;
- boolean viaProto = false;
-
- private List aggregatorsList = null;
-
- public ReportNewAggregatorsInfoRequestPBImpl() {
- builder = ReportNewAggregatorsInfoRequestProto.newBuilder();
- }
-
- public ReportNewAggregatorsInfoRequestPBImpl(
- ReportNewAggregatorsInfoRequestProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- public ReportNewAggregatorsInfoRequestProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- @Override
- public int hashCode() {
- return getProto().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null)
- return false;
- if (other.getClass().isAssignableFrom(this.getClass())) {
- return this.getProto().equals(this.getClass().cast(other).getProto());
- }
- return false;
- }
-
- private void mergeLocalToProto() {
- if (viaProto)
- maybeInitBuilder();
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
-
- private void mergeLocalToBuilder() {
- if (aggregatorsList != null) {
- addLocalAggregatorsToProto();
- }
- }
-
- private void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = ReportNewAggregatorsInfoRequestProto.newBuilder(proto);
- }
- viaProto = false;
- }
-
- private void addLocalAggregatorsToProto() {
- maybeInitBuilder();
- builder.clearAppAggregators();
- List protoList =
- new ArrayList();
- for (AppAggregatorsMap m : this.aggregatorsList) {
- protoList.add(convertToProtoFormat(m));
- }
- builder.addAllAppAggregators(protoList);
- }
-
- private void initLocalAggregatorsList() {
- ReportNewAggregatorsInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
- List aggregatorsList =
- p.getAppAggregatorsList();
- this.aggregatorsList = new ArrayList();
- for (AppAggregatorsMapProto m : aggregatorsList) {
- this.aggregatorsList.add(convertFromProtoFormat(m));
- }
- }
-
- @Override
- public List getAppAggregatorsList() {
- if (this.aggregatorsList == null) {
- initLocalAggregatorsList();
- }
- return this.aggregatorsList;
- }
-
- @Override
- public void setAppAggregatorsList(List appAggregatorsList) {
- maybeInitBuilder();
- if (appAggregatorsList == null) {
- builder.clearAppAggregators();
- }
- this.aggregatorsList = appAggregatorsList;
- }
-
- private AppAggregatorsMapPBImpl convertFromProtoFormat(
- AppAggregatorsMapProto p) {
- return new AppAggregatorsMapPBImpl(p);
- }
-
- private AppAggregatorsMapProto convertToProtoFormat(
- AppAggregatorsMap m) {
- return ((AppAggregatorsMapPBImpl) m).getProto();
- }
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
new file mode 100644
index 0000000000..5dd8f17fed
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl;
+
+public class ReportNewCollectorInfoRequestPBImpl extends
+ ReportNewCollectorInfoRequest {
+
+ ReportNewCollectorInfoRequestProto proto =
+ ReportNewCollectorInfoRequestProto.getDefaultInstance();
+
+ ReportNewCollectorInfoRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List collectorsList = null;
+
+ public ReportNewCollectorInfoRequestPBImpl() {
+ builder = ReportNewCollectorInfoRequestProto.newBuilder();
+ }
+
+ public ReportNewCollectorInfoRequestPBImpl(
+ ReportNewCollectorInfoRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ReportNewCollectorInfoRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (collectorsList != null) {
+ addLocalCollectorsToProto();
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ReportNewCollectorInfoRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addLocalCollectorsToProto() {
+ maybeInitBuilder();
+ builder.clearAppCollectors();
+ List protoList =
+ new ArrayList();
+ for (AppCollectorsMap m : this.collectorsList) {
+ protoList.add(convertToProtoFormat(m));
+ }
+ builder.addAllAppCollectors(protoList);
+ }
+
+ private void initLocalCollectorsList() {
+ ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List collectorsList =
+ p.getAppCollectorsList();
+ this.collectorsList = new ArrayList();
+ for (AppCollectorsMapProto m : collectorsList) {
+ this.collectorsList.add(convertFromProtoFormat(m));
+ }
+ }
+
+ @Override
+ public List getAppCollectorsList() {
+ if (this.collectorsList == null) {
+ initLocalCollectorsList();
+ }
+ return this.collectorsList;
+ }
+
+ @Override
+ public void setAppCollectorsList(List appCollectorsList) {
+ maybeInitBuilder();
+ if (appCollectorsList == null) {
+ builder.clearAppCollectors();
+ }
+ this.collectorsList = appCollectorsList;
+ }
+
+ private AppCollectorsMapPBImpl convertFromProtoFormat(
+ AppCollectorsMapProto p) {
+ return new AppCollectorsMapPBImpl(p);
+ }
+
+ private AppCollectorsMapProto convertToProtoFormat(
+ AppCollectorsMap m) {
+ return ((AppCollectorsMapPBImpl) m).getProto();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
similarity index 74%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
index 0f0925a85c..7c90675b6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
@@ -19,33 +19,33 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import com.google.protobuf.TextFormat;
@Private
@Unstable
-public class ReportNewAggregatorsInfoResponsePBImpl extends
- ReportNewAggregatorsInfoResponse {
+public class ReportNewCollectorInfoResponsePBImpl extends
+ ReportNewCollectorInfoResponse {
+
+ ReportNewCollectorInfoResponseProto proto =
+ ReportNewCollectorInfoResponseProto.getDefaultInstance();
+
+ ReportNewCollectorInfoResponseProto.Builder builder = null;
- ReportNewAggregatorsInfoResponseProto proto =
- ReportNewAggregatorsInfoResponseProto.getDefaultInstance();
-
- ReportNewAggregatorsInfoResponseProto.Builder builder = null;
-
boolean viaProto = false;
-
- public ReportNewAggregatorsInfoResponsePBImpl() {
- builder = ReportNewAggregatorsInfoResponseProto.newBuilder();
+
+ public ReportNewCollectorInfoResponsePBImpl() {
+ builder = ReportNewCollectorInfoResponseProto.newBuilder();
}
- public ReportNewAggregatorsInfoResponsePBImpl(ReportNewAggregatorsInfoResponseProto proto) {
+ public ReportNewCollectorInfoResponsePBImpl(ReportNewCollectorInfoResponseProto proto) {
this.proto = proto;
viaProto = true;
}
-
- public ReportNewAggregatorsInfoResponseProto getProto() {
+
+ public ReportNewCollectorInfoResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
deleted file mode 100644
index 67c377de25..0000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.hadoop.yarn.server.api.records;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.Records;
-
-
-@Private
-public abstract class AppAggregatorsMap {
-
- public static AppAggregatorsMap newInstance(
- ApplicationId id, String aggregatorAddr) {
- AppAggregatorsMap appAggregatorMap =
- Records.newRecord(AppAggregatorsMap.class);
- appAggregatorMap.setApplicationId(id);
- appAggregatorMap.setAggregatorAddr(aggregatorAddr);
- return appAggregatorMap;
- }
-
- public abstract ApplicationId getApplicationId();
-
- public abstract void setApplicationId(
- ApplicationId id);
-
- public abstract String getAggregatorAddr();
-
- public abstract void setAggregatorAddr(
- String addr);
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
new file mode 100644
index 0000000000..07e1d92898
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+
+@Private
+public abstract class AppCollectorsMap {
+
+ public static AppCollectorsMap newInstance(
+ ApplicationId id, String collectorAddr) {
+ AppCollectorsMap appCollectorsMap =
+ Records.newRecord(AppCollectorsMap.class);
+ appCollectorsMap.setApplicationId(id);
+ appCollectorsMap.setCollectorAddr(collectorAddr);
+ return appCollectorsMap;
+ }
+
+ public abstract ApplicationId getApplicationId();
+
+ public abstract void setApplicationId(ApplicationId id);
+
+ public abstract String getCollectorAddr();
+
+ public abstract void setCollectorAddr(String addr);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
similarity index 71%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
index 32903e2e58..eb3bde4df6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
@@ -21,37 +21,37 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder;
import com.google.protobuf.TextFormat;
@Private
@Unstable
-public class AppAggregatorsMapPBImpl extends AppAggregatorsMap {
+public class AppCollectorsMapPBImpl extends AppCollectorsMap {
- AppAggregatorsMapProto proto =
- AppAggregatorsMapProto.getDefaultInstance();
-
- AppAggregatorsMapProto.Builder builder = null;
+ AppCollectorsMapProto proto =
+ AppCollectorsMapProto.getDefaultInstance();
+
+ AppCollectorsMapProto.Builder builder = null;
boolean viaProto = false;
-
+
private ApplicationId appId = null;
- private String aggregatorAddr = null;
-
- public AppAggregatorsMapPBImpl() {
- builder = AppAggregatorsMapProto.newBuilder();
+ private String collectorAddr = null;
+
+ public AppCollectorsMapPBImpl() {
+ builder = AppCollectorsMapProto.newBuilder();
}
- public AppAggregatorsMapPBImpl(AppAggregatorsMapProto proto) {
+ public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) {
this.proto = proto;
viaProto = true;
}
-
- public AppAggregatorsMapProto getProto() {
+
+ public AppCollectorsMapProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
@@ -77,24 +77,24 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
-
+
@Override
public ApplicationId getApplicationId() {
- AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
+ AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
if (this.appId == null && p.hasAppId()) {
this.appId = convertFromProtoFormat(p.getAppId());
}
return this.appId;
}
-
+
@Override
- public String getAggregatorAddr() {
- AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
- if (this.aggregatorAddr == null
- && p.hasAppAggregatorAddr()) {
- this.aggregatorAddr = p.getAppAggregatorAddr();
+ public String getCollectorAddr() {
+ AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.collectorAddr == null
+ && p.hasAppCollectorAddr()) {
+ this.collectorAddr = p.getAppCollectorAddr();
}
- return this.aggregatorAddr;
+ return this.collectorAddr;
}
@Override
@@ -105,31 +105,31 @@ public void setApplicationId(ApplicationId appId) {
}
this.appId = appId;
}
-
+
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
+ public void setCollectorAddr(String collectorAddr) {
maybeInitBuilder();
- if (aggregatorAddr == null) {
- builder.clearAppAggregatorAddr();
+ if (collectorAddr == null) {
+ builder.clearAppCollectorAddr();
}
- this.aggregatorAddr = aggregatorAddr;
+ this.collectorAddr = collectorAddr;
}
-
+
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
-
+
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
-
+
private void maybeInitBuilder() {
if (viaProto || builder == null) {
- builder = AppAggregatorsMapProto.newBuilder(proto);
+ builder = AppCollectorsMapProto.newBuilder(proto);
}
viaProto = false;
}
-
+
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
@@ -138,13 +138,13 @@ private void mergeLocalToProto() {
proto = builder.build();
viaProto = true;
}
-
+
private void mergeLocalToBuilder() {
if (this.appId != null) {
builder.setAppId(convertToProtoFormat(this.appId));
}
- if (this.aggregatorAddr != null) {
- builder.setAppAggregatorAddr(this.aggregatorAddr);
+ if (this.collectorAddr != null) {
+ builder.setAppCollectorAddr(this.collectorAddr);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
similarity index 81%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
index d7b05c1417..654a9f2168 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
@@ -17,13 +17,13 @@
*/
option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "AggregatorNodemanagerProtocol";
+option java_outer_classname = "CollectorNodemanagerProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_server_common_service_protos.proto";
-service AggregatorNodemanagerProtocolService {
- rpc reportNewAggregatorInfo (ReportNewAggregatorsInfoRequestProto) returns (ReportNewAggregatorsInfoResponseProto);
+service CollectorNodemanagerProtocolService {
+ rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 625928b5d0..af00748417 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -84,7 +84,7 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_nm_token_master_key = 3;
optional NodeLabelsProto nodeLabels = 4;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
- repeated AppAggregatorsMapProto registered_aggregators = 6;
+ repeated AppCollectorsMapProto registered_collectors = 6;
}
message LogAggregationReportProto {
@@ -109,7 +109,7 @@ message NodeHeartbeatResponseProto {
repeated SignalContainerRequestProto containers_to_signal = 13;
optional ResourceProto resource = 14;
optional ContainerQueuingLimitProto container_queuing_limit = 15;
- repeated AppAggregatorsMapProto app_aggregators_map = 16;
+ repeated AppCollectorsMapProto app_collectors_map = 16;
}
message ContainerQueuingLimitProto {
@@ -123,21 +123,21 @@ message SystemCredentialsForAppsProto {
}
////////////////////////////////////////////////////////////////////////
-////// From aggregator_nodemanager_protocol ////////////////////////////
+////// From collector_nodemanager_protocol ////////////////////////////
////////////////////////////////////////////////////////////////////////
-message AppAggregatorsMapProto {
+message AppCollectorsMapProto {
optional ApplicationIdProto appId = 1;
- optional string appAggregatorAddr = 2;
+ optional string appCollectorAddr = 2;
}
//////////////////////////////////////////////////////
-/////// aggregator_nodemanager_protocol //////////////
+/////// collector_nodemanager_protocol //////////////
//////////////////////////////////////////////////////
-message ReportNewAggregatorsInfoRequestProto {
- repeated AppAggregatorsMapProto app_aggregators = 1;
+message ReportNewCollectorInfoRequestProto {
+ repeated AppCollectorsMapProto app_collectors = 1;
}
-message ReportNewAggregatorsInfoResponseProto {
+message ReportNewCollectorInfoResponseProto {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index 3dc92ce948..1f47dfda13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -63,10 +63,10 @@
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -76,15 +76,15 @@ public class TestRPC {
private static final String EXCEPTION_MSG = "test error";
private static final String EXCEPTION_CAUSE = "exception cause";
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
- public static final String ILLEGAL_NUMBER_MESSAGE =
- "aggregators' number in ReportNewAggregatorsInfoRequest is not ONE.";
-
- public static final String DEFAULT_AGGREGATOR_ADDR = "localhost:0";
-
- public static final ApplicationId DEFAULT_APP_ID =
+
+ public static final String ILLEGAL_NUMBER_MESSAGE =
+ "collectors' number in ReportNewCollectorInfoRequest is not ONE.";
+
+ public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
+
+ public static final ApplicationId DEFAULT_APP_ID =
ApplicationId.newInstance(0, 0);
-
+
@Test
public void testUnknownCall() {
Configuration conf = new Configuration();
@@ -116,17 +116,17 @@ public void testUnknownCall() {
server.stop();
}
}
-
+
@Test
- public void testRPCOnAggregatorNodeManagerProtocol() throws IOException {
+ public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
.getName());
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
- Server server = rpc.getServer(AggregatorNodemanagerProtocol.class,
- new DummyNMAggregatorService(), addr, conf, null, 1);
+ Server server = rpc.getServer(CollectorNodemanagerProtocol.class,
+ new DummyNMCollectorService(), addr, conf, null, 1);
server.start();
// Test unrelated protocol wouldn't get response
@@ -145,31 +145,31 @@ public void testRPCOnAggregatorNodeManagerProtocol() throws IOException {
} catch (Exception e) {
e.printStackTrace();
}
-
- // Test AggregatorNodemanagerProtocol get proper response
- AggregatorNodemanagerProtocol proxy = (AggregatorNodemanagerProtocol)rpc.getProxy(
- AggregatorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
- // Verify request with DEFAULT_APP_ID and DEFAULT_AGGREGATOR_ADDR get
+
+ // Test CollectorNodemanagerProtocol get proper response
+ CollectorNodemanagerProtocol proxy = (CollectorNodemanagerProtocol)rpc.getProxy(
+ CollectorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
+ // Verify request with DEFAULT_APP_ID and DEFAULT_COLLECTOR_ADDR get
// normally response.
try {
- ReportNewAggregatorsInfoRequest request =
- ReportNewAggregatorsInfoRequest.newInstance(
- DEFAULT_APP_ID, DEFAULT_AGGREGATOR_ADDR);
- proxy.reportNewAggregatorInfo(request);
+ ReportNewCollectorInfoRequest request =
+ ReportNewCollectorInfoRequest.newInstance(
+ DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR);
+ proxy.reportNewCollectorInfo(request);
} catch (YarnException e) {
Assert.fail("RPC call failured is not expected here.");
}
-
- // Verify empty request get YarnException back (by design in
- // DummyNMAggregatorService)
+
+ // Verify empty request get YarnException back (by design in
+ // DummyNMCollectorService)
try {
- proxy.reportNewAggregatorInfo(Records
- .newRecord(ReportNewAggregatorsInfoRequest.class));
+ proxy.reportNewCollectorInfo(Records
+ .newRecord(ReportNewCollectorInfoRequest.class));
Assert.fail("Excepted RPC call to fail with YarnException.");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
}
-
+
server.stop();
}
@@ -177,21 +177,21 @@ public void testRPCOnAggregatorNodeManagerProtocol() throws IOException {
public void testHadoopProtoRPC() throws Exception {
test(HadoopYarnProtoRPC.class.getName());
}
-
+
private void test(String rpcClass) throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
- Server server = rpc.getServer(ContainerManagementProtocol.class,
+ Server server = rpc.getServer(ContainerManagementProtocol.class,
new DummyContainerManager(), addr, conf, null, 1);
server.start();
RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class);
- ContainerManagementProtocol proxy = (ContainerManagementProtocol)
- rpc.getProxy(ContainerManagementProtocol.class,
+ ContainerManagementProtocol proxy = (ContainerManagementProtocol)
+ rpc.getProxy(ContainerManagementProtocol.class,
NetUtils.getConnectAddress(server), conf);
- ContainerLaunchContext containerLaunchContext =
+ ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
@@ -255,7 +255,7 @@ public class DummyContainerManager implements ContainerManagementProtocol {
public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request)
throws YarnException {
- GetContainerStatusesResponse response =
+ GetContainerStatusesResponse response =
recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
response.setContainerStatuses(statuses);
return response;
@@ -287,9 +287,9 @@ public StartContainersResponse startContainers(
}
@Override
- public StopContainersResponse stopContainers(StopContainersRequest request)
+ public StopContainersResponse stopContainers(StopContainersRequest request)
throws YarnException {
- Exception e = new Exception(EXCEPTION_MSG,
+ Exception e = new Exception(EXCEPTION_MSG,
new Exception(EXCEPTION_CAUSE));
throw new YarnException(e);
}
@@ -332,32 +332,32 @@ public static Token newContainerToken(NodeId nodeId, byte[] password,
.buildTokenService(addr).toString());
return containerToken;
}
-
- // A dummy implementation for AggregatorNodemanagerProtocol for test purpose,
- // it only can accept one appID, aggregatorAddr pair or throw exceptions
- public class DummyNMAggregatorService
- implements AggregatorNodemanagerProtocol {
-
+
+ // A dummy implementation for CollectorNodemanagerProtocol for test purpose,
+ // it only can accept one appID, collectorAddr pair or throw exceptions
+ public class DummyNMCollectorService
+ implements CollectorNodemanagerProtocol {
+
@Override
- public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
- ReportNewAggregatorsInfoRequest request)
+ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request)
throws YarnException, IOException {
- List appAggregators = request.getAppAggregatorsList();
- if (appAggregators.size() == 1) {
- // check default appID and aggregatorAddr
- AppAggregatorsMap appAggregator = appAggregators.get(0);
- Assert.assertEquals(appAggregator.getApplicationId(),
+ List appCollectors = request.getAppCollectorsList();
+ if (appCollectors.size() == 1) {
+ // check default appID and collectorAddr
+ AppCollectorsMap appCollector = appCollectors.get(0);
+ Assert.assertEquals(appCollector.getApplicationId(),
DEFAULT_APP_ID);
- Assert.assertEquals(appAggregator.getAggregatorAddr(),
- DEFAULT_AGGREGATOR_ADDR);
+ Assert.assertEquals(appCollector.getCollectorAddr(),
+ DEFAULT_COLLECTOR_ADDR);
} else {
throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
}
-
- ReportNewAggregatorsInfoResponse response =
- recordFactory.newRecordInstance(ReportNewAggregatorsInfoResponse.class);
+
+ ReportNewCollectorInfoResponse response =
+ recordFactory.newRecordInstance(ReportNewCollectorInfoResponse.class);
return response;
}
}
-
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index 4c65948dee..475ce58578 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -109,14 +109,14 @@ public void testNodeHeartbeatRequestPBImpl() {
original.setLastKnownNMTokenMasterKey(getMasterKey());
original.setNodeStatus(getNodeStatus());
original.setNodeLabels(getValidNodeLabels());
- Map aggregators = getAggregators();
- original.setRegisteredAggregators(aggregators);
+ Map collectors = getCollectors();
+ original.setRegisteredCollectors(collectors);
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
original.getProto());
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
- assertEquals(aggregators, copy.getRegisteredAggregators());
+ assertEquals(collectors, copy.getRegisteredCollectors());
// check labels are coming with valid values
Assert.assertTrue(original.getNodeLabels()
.containsAll(copy.getNodeLabels()));
@@ -153,8 +153,8 @@ public void testNodeHeartbeatResponsePBImpl() {
original.setNextHeartBeatInterval(1000);
original.setNodeAction(NodeAction.NORMAL);
original.setResponseId(100);
- Map aggregators = getAggregators();
- original.setAppAggregatorsMap(aggregators);
+ Map collectors = getCollectors();
+ original.setAppCollectorsMap(collectors);
NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
original.getProto());
@@ -164,7 +164,7 @@ public void testNodeHeartbeatResponsePBImpl() {
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
- assertEquals(aggregators, copy.getAppAggregatorsMap());
+ assertEquals(collectors, copy.getAppCollectorsMap());
assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
}
@@ -344,15 +344,15 @@ private HashSet getValidNodeLabels() {
return nodeLabels;
}
- private Map getAggregators() {
+ private Map getCollectors() {
ApplicationId appID = ApplicationId.newInstance(1L, 1);
- String aggregatorAddr = "localhost:0";
- Map aggregatorMap =
+ String collectorAddr = "localhost:0";
+ Map collectorMap =
new HashMap();
- aggregatorMap.put(appID, aggregatorAddr);
- return aggregatorMap;
+ collectorMap.put(appID, collectorAddr);
+ return collectorMap;
}
-
+
private ContainerStatus getContainerStatus(int applicationId,
int containerID, int appAttemptId) {
ContainerStatus status = recordFactory
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index aa8655c52f..af896c853c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -70,19 +70,19 @@ interface QueuingContext {
ConcurrentMap getApplications();
Map getSystemCredentialsForApps();
-
+
/**
- * Get the registered aggregators that located on this NM.
+ * Get the registered collectors that located on this NM.
* @return registered
*/
- Map getRegisteredAggregators();
-
+ Map getRegisteredCollectors();
+
/**
- * Return the known aggregators which get from RM for all active applications
+ * Return the known collectors which get from RM for all active applications
* running on this NM.
- * @return known aggregators.
+ * @return known collectors.
*/
- Map getKnownAggregators();
+ Map getKnownCollectors();
ConcurrentMap getContainers();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 271795c746..0792c370aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -60,7 +60,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
-import org.apache.hadoop.yarn.server.nodemanager.aggregatormanager.NMAggregatorService;
+import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -99,7 +99,7 @@ public class NodeManager extends CompositeService
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
- private NMAggregatorService nmAggregatorService;
+ private NMCollectorService nmCollectorService;
private NodeStatusUpdater nodeStatusUpdater;
private NodeResourceMonitor nodeResourceMonitor;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
@@ -184,9 +184,9 @@ protected ContainerManagerImpl createContainerManager(Context context,
metrics, dirsHandler);
}
}
-
- protected NMAggregatorService createNMAggregatorService(Context context) {
- return new NMAggregatorService(context);
+
+ protected NMCollectorService createNMCollectorService(Context context) {
+ return new NMCollectorService(context);
}
protected WebServer createWebServer(Context nmContext,
@@ -379,9 +379,9 @@ protected void serviceInit(Configuration conf) throws Exception {
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
DefaultMetricsSystem.initialize("NodeManager");
-
- this.nmAggregatorService = createNMAggregatorService(context);
- addService(nmAggregatorService);
+
+ this.nmCollectorService = createNMCollectorService(context);
+ addService(nmCollectorService);
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
@@ -474,11 +474,11 @@ public static class NMContext implements Context {
protected final ConcurrentMap containers =
new ConcurrentSkipListMap();
-
- protected Map registeredAggregators =
+
+ protected Map registeredCollectors =
new ConcurrentHashMap();
-
- protected Map knownAggregators =
+
+ protected Map knownCollectors =
new ConcurrentHashMap();
protected final ConcurrentMap getRegisteredAggregators() {
- return this.registeredAggregators;
+ public Map getRegisteredCollectors() {
+ return this.registeredCollectors;
}
- public void addRegisteredAggregators(
- Map newRegisteredAggregators) {
- this.registeredAggregators.putAll(newRegisteredAggregators);
- // Update to knownAggregators as well so it can immediately be consumed by
+ public void addRegisteredCollectors(
+ Map newRegisteredCollectors) {
+ this.registeredCollectors.putAll(newRegisteredCollectors);
+ // Update to knownCollectors as well so it can immediately be consumed by
// this NM's TimelineClient.
- this.knownAggregators.putAll(newRegisteredAggregators);
- }
-
- @Override
- public Map getKnownAggregators() {
- return this.knownAggregators;
+ this.knownCollectors.putAll(newRegisteredCollectors);
}
- public void addKnownAggregators(
- Map knownAggregators) {
- this.knownAggregators.putAll(knownAggregators);
+ @Override
+ public Map getKnownCollectors() {
+ return this.knownCollectors;
+ }
+
+ public void addKnownCollectors(
+ Map knownCollectors) {
+ this.knownCollectors.putAll(knownCollectors);
}
}
@@ -781,10 +781,10 @@ Dispatcher getNMDispatcher(){
public Context getNMContext() {
return this.context;
}
-
+
// For testing
- NMAggregatorService getNMAggregatorService() {
- return this.nmAggregatorService;
+ NMCollectorService getNMCollectorService() {
+ return this.nmCollectorService;
}
public static void main(String[] args) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index f54469305a..eebc1e0952 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -814,7 +814,7 @@ public void run() {
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat,
- NodeStatusUpdaterImpl.this.context.getRegisteredAggregators());
+ NodeStatusUpdaterImpl.this.context.getRegisteredCollectors());
if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
@@ -907,8 +907,9 @@ public void run() {
}
}
- Map knownAggregators = response.getAppAggregatorsMap();
- ((NodeManager.NMContext)context).addKnownAggregators(knownAggregators);
+ Map knownCollectors =
+ response.getAppCollectorsMap();
+ ((NodeManager.NMContext)context).addKnownCollectors(knownCollectors);
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
similarity index 54%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index 17150bac49..009fa63357 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.nodemanager.aggregatormanager;
+package org.apache.hadoop.yarn.server.nodemanager.collectormanager;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -25,46 +25,43 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.CompositeService;
-
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-public class NMAggregatorService extends CompositeService implements
- AggregatorNodemanagerProtocol {
+public class NMCollectorService extends CompositeService implements
+ CollectorNodemanagerProtocol {
- private static final Log LOG = LogFactory.getLog(NMAggregatorService.class);
+ private static final Log LOG = LogFactory.getLog(NMCollectorService.class);
final Context context;
-
+
private Server server;
- public NMAggregatorService(Context context) {
-
- super(NMAggregatorService.class.getName());
+ public NMCollectorService(Context context) {
+
+ super(NMCollectorService.class.getName());
this.context = context;
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
-
- InetSocketAddress aggregatorServerAddress = conf.getSocketAddr(
+
+ InetSocketAddress collectorServerAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
- YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
+ YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
Configuration serverConf = new Configuration(conf);
@@ -72,42 +69,42 @@ protected void serviceStart() throws Exception {
YarnRPC rpc = YarnRPC.create(conf);
server =
- rpc.getServer(AggregatorNodemanagerProtocol.class, this,
- aggregatorServerAddress, serverConf,
+ rpc.getServer(CollectorNodemanagerProtocol.class, this,
+ collectorServerAddress, serverConf,
this.context.getNMTokenSecretManager(),
- conf.getInt(YarnConfiguration.NM_AGGREGATOR_SERVICE_THREAD_COUNT,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT));
+ conf.getInt(YarnConfiguration.NM_COLLECTOR_SERVICE_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT));
server.start();
// start remaining services
super.serviceStart();
- LOG.info("NMAggregatorService started at " + aggregatorServerAddress);
+ LOG.info("NMCollectorService started at " + collectorServerAddress);
}
-
+
@Override
public void serviceStop() throws Exception {
if (server != null) {
server.stop();
}
- // TODO may cleanup app aggregators running on this NM in future.
+ // TODO may cleanup app collectors running on this NM in future.
super.serviceStop();
}
@Override
- public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
- ReportNewAggregatorsInfoRequest request) throws IOException {
- List newAggregatorsList = request.getAppAggregatorsList();
- if (newAggregatorsList != null && !newAggregatorsList.isEmpty()) {
- Map newAggregatorsMap =
+ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request) throws IOException {
+ List newCollectorsList = request.getAppCollectorsList();
+ if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
+ Map newCollectorsMap =
new HashMap();
- for (AppAggregatorsMap aggregator : newAggregatorsList) {
- newAggregatorsMap.put(aggregator.getApplicationId(), aggregator.getAggregatorAddr());
+ for (AppCollectorsMap collector : newCollectorsList) {
+ newCollectorsMap.put(collector.getApplicationId(), collector.getCollectorAddr());
}
- ((NodeManager.NMContext)context).addRegisteredAggregators(newAggregatorsMap);
+ ((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap);
}
-
- return ReportNewAggregatorsInfoResponse.newInstance();
+
+ return ReportNewCollectorInfoResponse.newInstance();
}
-
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6864b0ddf7..c9c6f6bf49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -496,10 +496,11 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
new LogHandlerAppFinishedEvent(app.appId));
app.context.getNMTokenSecretManager().appFinished(app.getAppId());
- // Remove aggregator info for finished apps.
- // TODO check we remove related aggregators info in failure cases (YARN-3038)
- app.context.getRegisteredAggregators().remove(app.getAppId());
- app.context.getKnownAggregators().remove(app.getAppId());
+ // Remove collectors info for finished apps.
+ // TODO check we remove related collectors info in failure cases
+ // (YARN-3038)
+ app.context.getRegisteredCollectors().remove(app.getAppId());
+ app.context.getKnownCollectors().remove(app.getAppId());
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index a974267730..70bd8f4c1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -618,12 +618,12 @@ public Map getSystemCredentialsForApps() {
}
@Override
- public Map getRegisteredAggregators() {
+ public Map getRegisteredCollectors() {
return null;
}
@Override
- public Map getKnownAggregators() {
+ public Map getKnownCollectors() {
return null;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 805794871e..d0a8a6e9b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -311,8 +311,8 @@ public FinishApplicationMasterResponse finishApplicationMaster(
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
- // Remove aggregator address when app get finished.
- rmApp.removeAggregatorAddr();
+ // Remove collector address when app get finished.
+ rmApp.removeCollectorAddr();
// checking whether the app exits in RMStateStore at first not to throw
// ApplicationDoesNotExistInCacheException before and after
// RM work-preserving restart.
@@ -573,10 +573,10 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-
- // add aggregator address for this application
- allocateResponse.setAggregatorAddr(
- this.rmContext.getRMApps().get(applicationId).getAggregatorAddr());
+
+ // add collector address for this application
+ allocateResponse.setCollectorAddr(
+ this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
// add preemption to the allocateResponse message (if any)
allocateResponse
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 4b712c0a1d..683aa9143b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -70,7 +70,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -516,11 +516,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
message);
}
-
- // Check & update aggregators info from request.
+
+ // Check & update collectors info from request.
// TODO make sure it won't have race condition issue for AM failed over case
// that the older registration could possible override the newer one.
- updateAppAggregatorsMap(request);
+ updateAppCollectorsMap(request);
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@@ -538,13 +538,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (!systemCredentials.isEmpty()) {
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
-
- // Return aggregators' map that NM needs to know
- // TODO we should optimize this to only include aggreator info that NM
+
+ // Return collectors' map that NM needs to know
+ // TODO we should optimize this to only include collector info that NM
// doesn't know yet.
- List keepAliveApps = remoteNodeStatus.getKeepAliveApplications();
+ List keepAliveApps =
+ remoteNodeStatus.getKeepAliveApplications();
if (keepAliveApps != null) {
- setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
+ setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
}
// 4. Send status to RMNode, saving the latest response.
@@ -589,48 +590,49 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
return nodeHeartBeatResponse;
}
-
- private void setAppAggregatorsMapToResponse(
+
+ private void setAppCollectorsMapToResponse(
List liveApps, NodeHeartbeatResponse response) {
- Map liveAppAggregatorsMap = new
+ Map liveAppCollectorsMap = new
ConcurrentHashMap();
Map rmApps = rmContext.getRMApps();
for (ApplicationId appId : liveApps) {
- String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr();
- if (appAggregatorAddr != null) {
- liveAppAggregatorsMap.put(appId, appAggregatorAddr);
+ String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
+ if (appCollectorAddr != null) {
+ liveAppCollectorsMap.put(appId, appCollectorAddr);
} else {
- // Log a debug info if aggregator address is not found.
+ // Log a debug info if collector address is not found.
if (LOG.isDebugEnabled()) {
- LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!");
+ LOG.debug("Collector for applicaton: " + appId +
+ " hasn't registered yet!");
}
}
}
- response.setAppAggregatorsMap(liveAppAggregatorsMap);
+ response.setAppCollectorsMap(liveAppCollectorsMap);
}
-
- private void updateAppAggregatorsMap(NodeHeartbeatRequest request) {
- Map registeredAggregatorsMap =
- request.getRegisteredAggregators();
- if (registeredAggregatorsMap != null
- && !registeredAggregatorsMap.isEmpty()) {
+
+ private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
+ Map registeredCollectorsMap =
+ request.getRegisteredCollectors();
+ if (registeredCollectorsMap != null
+ && !registeredCollectorsMap.isEmpty()) {
Map rmApps = rmContext.getRMApps();
- for (Map.Entry entry:
- registeredAggregatorsMap.entrySet()) {
+ for (Map.Entry entry:
+ registeredCollectorsMap.entrySet()) {
ApplicationId appId = entry.getKey();
- String aggregatorAddr = entry.getValue();
- if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) {
+ String collectorAddr = entry.getValue();
+ if (collectorAddr != null && !collectorAddr.isEmpty()) {
RMApp rmApp = rmApps.get(appId);
if (rmApp == null) {
- LOG.warn("Cannot update aggregator info because application ID: " +
+ LOG.warn("Cannot update collector info because application ID: " +
appId + " is not found in RMContext!");
} else {
- String previousAggregatorAddr = rmApp.getAggregatorAddr();
- if (previousAggregatorAddr == null ||
- previousAggregatorAddr != aggregatorAddr) {
- // sending aggregator update event.
- RMAppAggregatorUpdateEvent event =
- new RMAppAggregatorUpdateEvent(appId, aggregatorAddr);
+ String previousCollectorAddr = rmApp.getCollectorAddr();
+ if (previousCollectorAddr == null ||
+ previousCollectorAddr != collectorAddr) {
+ // sending collector update event.
+ RMAppCollectorUpdateEvent event =
+ new RMAppCollectorUpdateEvent(appId, collectorAddr);
rmContext.getDispatcher().getEventHandler().handle(event);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 25b2314ab0..8a795cde59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -175,23 +175,23 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
* @return the tracking url for the application master.
*/
String getTrackingUrl();
-
+
/**
- * The aggregator address for the application.
- * @return the address for the application's aggregator.
+ * The collector address for the application.
+ * @return the address for the application's collector.
*/
- String getAggregatorAddr();
-
+ String getCollectorAddr();
+
/**
- * Set aggregator address for the application
- * @param aggregatorAddr the address of aggregator
+ * Set collector address for the application
+ * @param collectorAddr the address of collector
*/
- void setAggregatorAddr(String aggregatorAddr);
-
+ void setCollectorAddr(String collectorAddr);
+
/**
- * Remove aggregator address when application is finished or killed.
+ * Remove collector address when application is finished or killed.
*/
- void removeAggregatorAddr();
+ void removeCollectorAddr();
/**
* The original tracking url for the application master.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
similarity index 71%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
index b43de44da9..698c9b5c4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
@@ -20,17 +20,18 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
-public class RMAppAggregatorUpdateEvent extends RMAppEvent {
+public class RMAppCollectorUpdateEvent extends RMAppEvent {
- private final String appAggregatorAddr;
-
- public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) {
- super(appId, RMAppEventType.AGGREGATOR_UPDATE);
- this.appAggregatorAddr = appAggregatorAddr;
+ private final String appCollectorAddr;
+
+ public RMAppCollectorUpdateEvent(ApplicationId appId,
+ String appCollectorAddr) {
+ super(appId, RMAppEventType.COLLECTOR_UPDATE);
+ this.appCollectorAddr = appCollectorAddr;
}
-
- public String getAppAggregatorAddr(){
- return this.appAggregatorAddr;
+
+ public String getAppCollectorAddr(){
+ return this.appCollectorAddr;
}
-
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index 6e9460aefe..2b42638a9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -30,9 +30,9 @@ public enum RMAppEventType {
// Source: Scheduler
APP_ACCEPTED,
-
+
// TODO add source later
- AGGREGATOR_UPDATE,
+ COLLECTOR_UPDATE,
// Source: RMAppAttempt
ATTEMPT_REGISTERED,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index cf50c3d466..ac6a585abd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -152,7 +152,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private long storedFinishTime = 0;
private int firstAttemptIdInStateStore = 1;
private int nextAttemptId = 1;
- private String aggregatorAddr;
+ private String collectorAddr;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@@ -201,7 +201,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -219,7 +219,7 @@ RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -239,7 +239,7 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@@ -257,7 +257,7 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
YarnApplicationState.RUNNING))
@@ -286,7 +286,7 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
@@ -317,7 +317,7 @@ RMAppEventType.KILL, new KillAttemptTransition())
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -330,7 +330,7 @@ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -343,7 +343,7 @@ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@@ -577,20 +577,20 @@ public String getQueue() {
public void setQueue(String queue) {
this.queue = queue;
}
-
+
@Override
- public String getAggregatorAddr() {
- return this.aggregatorAddr;
+ public String getCollectorAddr() {
+ return this.collectorAddr;
}
-
+
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
- this.aggregatorAddr = aggregatorAddr;
+ public void setCollectorAddr(String collectorAddr) {
+ this.collectorAddr = collectorAddr;
}
-
+
@Override
- public void removeAggregatorAddr() {
- this.aggregatorAddr = null;
+ public void removeCollectorAddr() {
+ this.collectorAddr = null;
}
@Override
@@ -864,8 +864,8 @@ public void recover(RMState state) {
// send the ATS create Event
sendATSCreateEvent(this, this.startTime);
- //TODO recover aggregator address.
- //this.aggregatorAddr = appState.getAggregatorAddr();
+ //TODO recover collector address.
+ //this.collectorAddr = appState.getCollectorAddr();
RMAppAttemptImpl preAttempt = null;
for (ApplicationAttemptId attemptId :
@@ -938,22 +938,22 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
- private static final class RMAppAggregatorUpdateTransition
+ private static final class RMAppCollectorUpdateTransition
extends RMAppTransition {
-
+
public void transition(RMAppImpl app, RMAppEvent event) {
- LOG.info("Updating aggregator info for app: " + app.getApplicationId());
-
- RMAppAggregatorUpdateEvent appAggregatorUpdateEvent =
- (RMAppAggregatorUpdateEvent) event;
- // Update aggregator address
- app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr());
-
+ LOG.info("Updating collector info for app: " + app.getApplicationId());
+
+ RMAppCollectorUpdateEvent appCollectorUpdateEvent =
+ (RMAppCollectorUpdateEvent) event;
+ // Update collector address
+ app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
+
// TODO persistent to RMStateStore for recover
// Save to RMStateStore
};
}
-
+
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 85d6129f25..19ee0b17c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -95,15 +95,15 @@ public StringBuilder getDiagnostics() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public String getAggregatorAddr() {
+ public String getCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
+ public void setCollectorAddr(String collectorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void removeAggregatorAddr() {
+ public void removeCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index e4d846441a..62a5c5282a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -302,17 +302,17 @@ public CallerContext getCallerContext() {
throw new UnsupportedOperationException("Not supported yet.");
}
- public String getAggregatorAddr() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public void removeAggregatorAddr() {
+ public String getCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
+ public void removeCollectorAddr() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void setCollectorAddr(String collectorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
index 22542a090f..6bd54c29cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
@@ -56,6 +56,11 @@
junit
test