YARN-6533. Race condition in writing service record to registry in yarn native services. Contributed by Billie Rinaldi
This commit is contained in:
parent
ce05c6e981
commit
a041373dd4
@ -503,7 +503,8 @@ public Iterable<String> getHostsList(Collection<ClusterNode> values,
|
|||||||
*/
|
*/
|
||||||
public void updateServiceRecord(StateAccessForProviders amState,
|
public void updateServiceRecord(StateAccessForProviders amState,
|
||||||
YarnRegistryViewForProviders yarnRegistry,
|
YarnRegistryViewForProviders yarnRegistry,
|
||||||
String containerId, String roleName, List<String> ip, String hostname) {
|
String containerId, String roleName, List<String> ip, String hostname)
|
||||||
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
RoleInstance role = null;
|
RoleInstance role = null;
|
||||||
if(ip != null && !ip.isEmpty()){
|
if(ip != null && !ip.isEmpty()){
|
||||||
@ -535,9 +536,6 @@ public void updateServiceRecord(StateAccessForProviders amState,
|
|||||||
} catch (NoSuchNodeException e) {
|
} catch (NoSuchNodeException e) {
|
||||||
// ignore - there is nothing to do if we don't find a container
|
// ignore - there is nothing to do if we don't find a container
|
||||||
log.warn("Owned container {} not found - {}", containerId, e);
|
log.warn("Owned container {} not found - {}", containerId, e);
|
||||||
} catch (IOException e) {
|
|
||||||
log.warn("Error updating container {} service record in registry",
|
|
||||||
containerId, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,8 +146,15 @@ public boolean processContainerStatus(ContainerId containerId,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
providerUtils.updateServiceRecord(amState, yarnRegistry,
|
providerUtils.updateServiceRecord(amState, yarnRegistry,
|
||||||
containerId.toString(), instance.role, status.getIPs(), status.getHost());
|
containerId.toString(), instance.role, status.getIPs(), status.getHost());
|
||||||
|
} catch (IOException e) {
|
||||||
|
// could not write service record to ZK, log and retry
|
||||||
|
log.warn("Error updating container {} service record in registry, " +
|
||||||
|
"retrying", containerId, e);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
// TODO publish ip and host
|
// TODO publish ip and host
|
||||||
org.apache.slider.api.resource.Container container =
|
org.apache.slider.api.resource.Container container =
|
||||||
instance.providerRole.component.getContainer(containerId.toString());
|
instance.providerRole.component.getContainer(containerId.toString());
|
||||||
|
@ -1182,24 +1182,8 @@ public boolean registerComponent(ContainerId id, RoleInstance roleInstance)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// this is where component registrations go
|
// this is where component registrations go
|
||||||
String cid = RegistryPathUtils.encodeYarnID(id.toString());
|
|
||||||
ServiceRecord record = new ServiceRecord();
|
|
||||||
record.set(YarnRegistryAttributes.YARN_ID, cid);
|
|
||||||
|
|
||||||
record.description = roleInstance.getCompInstanceName();
|
|
||||||
log.info("Registering component " + roleInstance.getCompInstanceName()
|
log.info("Registering component " + roleInstance.getCompInstanceName()
|
||||||
+ ", containerId = " + id);
|
+ ", containerId = " + id);
|
||||||
record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
|
|
||||||
PersistencePolicies.CONTAINER);
|
|
||||||
setUserProvidedServiceRecordAttributes(
|
|
||||||
instance.providerRole.component.getConfiguration(), record);
|
|
||||||
try {
|
|
||||||
yarnRegistryOperations.putComponent(cid, record);
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.warn("Failed to register container {}/{}: {}",
|
|
||||||
id, roleInstance.role, e, e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
org.apache.slider.api.resource.Container container =
|
org.apache.slider.api.resource.Container container =
|
||||||
new org.apache.slider.api.resource.Container();
|
new org.apache.slider.api.resource.Container();
|
||||||
container.setId(id.toString());
|
container.setId(id.toString());
|
||||||
|
Loading…
Reference in New Issue
Block a user