YARN-11552. Timeline endpoint: /clusters/{clusterid}/apps/{appid}/entity-types Error when using hdfs store (#5978) Contributed by Jiandan Yang.

Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
Yang Jiandan 2023-09-01 00:14:46 +08:00 committed by GitHub
parent 01cc6d0bc8
commit 2f739450be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 17 deletions

View File

@ -164,7 +164,7 @@ private static void fillFields(TimelineEntity finalEntity,
private String getFlowRunPath(String userId, String clusterId, private String getFlowRunPath(String userId, String clusterId,
String flowName, Long flowRunId, String appId) throws IOException { String flowName, Long flowRunId, String appId) throws IOException {
if (userId != null && flowName != null && flowRunId != null) { if (userId != null && flowName != null && flowRunId != null) {
return userId + File.separator + flowName + File.separator + flowRunId; return userId + File.separator + flowName + File.separator + "*" + File.separator + flowRunId;
} }
if (clusterId == null || appId == null) { if (clusterId == null || appId == null) {
throw new IOException("Unable to get flow info"); throw new IOException("Unable to get flow info");
@ -186,7 +186,7 @@ private String getFlowRunPath(String userId, String clusterId,
continue; continue;
} }
return record.get(1).trim() + File.separator + record.get(2).trim() + return record.get(1).trim() + File.separator + record.get(2).trim() +
File.separator + record.get(3).trim(); File.separator + "*" + File.separator + record.get(3).trim();
} }
parser.close(); parser.close();
} }
@ -286,6 +286,7 @@ public int compare(Long l1, Long l2) {
} }
} }
); );
dir = getNormalPath(dir);
if (dir != null) { if (dir != null) {
RemoteIterator<LocatedFileStatus> fileStatuses = fs.listFiles(dir, RemoteIterator<LocatedFileStatus> fileStatuses = fs.listFiles(dir,
false); false);
@ -394,9 +395,11 @@ public TimelineEntity getEntity(TimelineReaderContext context,
Path flowRunPath = new Path(clusterIdPath, flowRunPathStr); Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
Path appIdPath = new Path(flowRunPath, context.getAppId()); Path appIdPath = new Path(flowRunPath, context.getAppId());
Path entityTypePath = new Path(appIdPath, context.getEntityType()); Path entityTypePath = new Path(appIdPath, context.getEntityType());
Path entityFilePath = new Path(entityTypePath, Path entityFilePath = getNormalPath(new Path(entityTypePath,
context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION); context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION));
if (entityFilePath == null) {
return null;
}
try (BufferedReader reader = try (BufferedReader reader =
new BufferedReader(new InputStreamReader( new BufferedReader(new InputStreamReader(
fs.open(entityFilePath), Charset.forName("UTF-8")))) { fs.open(entityFilePath), Charset.forName("UTF-8")))) {
@ -410,6 +413,14 @@ public TimelineEntity getEntity(TimelineReaderContext context,
} }
} }
private Path getNormalPath(Path globPath) throws IOException {
FileStatus[] status = fs.globStatus(globPath);
if (status == null || status.length < 1) {
LOG.info("{} do not exist.", globPath);
return null;
}
return status[0].getPath();
}
@Override @Override
public Set<TimelineEntity> getEntities(TimelineReaderContext context, public Set<TimelineEntity> getEntities(TimelineReaderContext context,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
@ -433,13 +444,13 @@ public Set<TimelineEntity> getEntities(TimelineReaderContext context,
context.getClusterId(), context.getFlowName(), context.getFlowRunId(), context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId()); context.getAppId());
if (context.getUserId() == null) { if (context.getUserId() == null) {
context.setUserId(new Path(flowRunPathStr).getParent().getParent(). context.setUserId(new Path(flowRunPathStr).getParent().getParent().getParent().
getName()); getName());
} }
Path clusterIdPath = new Path(entitiesPath, context.getClusterId()); Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
Path flowRunPath = new Path(clusterIdPath, flowRunPathStr); Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
Path appIdPath = new Path(flowRunPath, context.getAppId()); Path appIdPath = new Path(flowRunPath, context.getAppId());
FileStatus[] fileStatuses = fs.listStatus(appIdPath); FileStatus[] fileStatuses = fs.listStatus(getNormalPath(appIdPath));
for (FileStatus fileStatus : fileStatuses) { for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) { if (fileStatus.isDirectory()) {
result.add(fileStatus.getPath().getName()); result.add(fileStatus.getPath().getName());

View File

@ -66,6 +66,11 @@ public class TestFileSystemTimelineReaderImpl {
private static final String ROOT_DIR = new File("target", private static final String ROOT_DIR = new File("target",
TestFileSystemTimelineReaderImpl.class.getSimpleName()).getAbsolutePath(); TestFileSystemTimelineReaderImpl.class.getSimpleName()).getAbsolutePath();
private static String cluster = "cluster1";
private static String user = "user1";
private static String flowVersion = "v1";
private static String flowRunId = "1";
private FileSystemTimelineReaderImpl reader; private FileSystemTimelineReaderImpl reader;
@BeforeAll @BeforeAll
@ -125,7 +130,7 @@ private static void writeEntityFile(TimelineEntity entity, File dir)
private static void loadEntityData(String rootDir) throws Exception { private static void loadEntityData(String rootDir) throws Exception {
File appDir = File appDir =
getAppDir(rootDir, "cluster1", "user1", "flow1", "1", "app1", "app"); getAppDir(rootDir, "flow1", "app1", "app");
TimelineEntity entity11 = new TimelineEntity(); TimelineEntity entity11 = new TimelineEntity();
entity11.setId("id_1"); entity11.setId("id_1");
entity11.setType("app"); entity11.setType("app");
@ -266,8 +271,9 @@ private static void loadEntityData(String rootDir) throws Exception {
entity4.addEvent(event44); entity4.addEvent(event44);
writeEntityFile(entity4, appDir); writeEntityFile(entity4, appDir);
File attemptDir = getAppDir(rootDir, "cluster1", "user1", "flow1", "1",
"app1", TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString()); File attemptDir = getAppDir(rootDir, "flow1", "app1",
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString());
ApplicationAttemptEntity attempt1 = new ApplicationAttemptEntity(); ApplicationAttemptEntity attempt1 = new ApplicationAttemptEntity();
attempt1.setId("app-attempt-1"); attempt1.setId("app-attempt-1");
attempt1.setCreatedTime(1425017502003L); attempt1.setCreatedTime(1425017502003L);
@ -277,8 +283,8 @@ private static void loadEntityData(String rootDir) throws Exception {
attempt2.setCreatedTime(1425017502004L); attempt2.setCreatedTime(1425017502004L);
writeEntityFile(attempt2, attemptDir); writeEntityFile(attempt2, attemptDir);
File entityDir = getAppDir(rootDir, "cluster1", "user1", "flow1", "1", File entityDir = getAppDir(rootDir, "flow1", "app1",
"app1", TimelineEntityType.YARN_CONTAINER.toString()); TimelineEntityType.YARN_CONTAINER.toString());
ContainerEntity containerEntity1 = new ContainerEntity(); ContainerEntity containerEntity1 = new ContainerEntity();
containerEntity1.setId("container_1_1"); containerEntity1.setId("container_1_1");
containerEntity1.setParent(attempt1.getIdentifier()); containerEntity1.setParent(attempt1.getIdentifier());
@ -298,8 +304,7 @@ private static void loadEntityData(String rootDir) throws Exception {
writeEntityFile(containerEntity3, entityDir); writeEntityFile(containerEntity3, entityDir);
File appDir2 = File appDir2 =
getAppDir(rootDir, "cluster1", "user1", "flow1,flow", "1", "app2", getAppDir(rootDir, "flow1,flow", "app2", "app");
"app");
TimelineEntity entity5 = new TimelineEntity(); TimelineEntity entity5 = new TimelineEntity();
entity5.setId("id_5"); entity5.setId("id_5");
entity5.setType("app"); entity5.setType("app");
@ -307,10 +312,9 @@ private static void loadEntityData(String rootDir) throws Exception {
writeEntityFile(entity5, appDir2); writeEntityFile(entity5, appDir2);
} }
private static File getAppDir(String rootDir, String cluster, String user, private static File getAppDir(String rootDir, String flowName, String appId, String entityName) {
String flowName, String flowRunId, String appId, String entityName) {
return new File(rootDir + File.separator + "entities" + File.separator + return new File(rootDir + File.separator + "entities" + File.separator +
cluster + File.separator + user + File.separator + flowName + cluster + File.separator + user + File.separator + flowName + File.separator + flowVersion +
File.separator + flowRunId + File.separator + appId + File.separator + File.separator + flowRunId + File.separator + appId + File.separator +
entityName + File.separator); entityName + File.separator);
} }