From 2f739450be88df2929c25a59ab4a48f84df5cbc2 Mon Sep 17 00:00:00 2001 From: Yang Jiandan Date: Fri, 1 Sep 2023 00:14:46 +0800 Subject: [PATCH] YARN-11552. Timeline endpoint: /clusters/{clusterid}/apps/{appid}/entity-types Error when using hdfs store (#5978) Contributed by Jiandan Yang. Reviewed-by: Shilun Fan Signed-off-by: Shilun Fan --- .../storage/FileSystemTimelineReaderImpl.java | 25 +++++++++++++------ .../TestFileSystemTimelineReaderImpl.java | 24 ++++++++++-------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 913b8360e2..dff21a31da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -164,7 +164,7 @@ private static void fillFields(TimelineEntity finalEntity, private String getFlowRunPath(String userId, String clusterId, String flowName, Long flowRunId, String appId) throws IOException { if (userId != null && flowName != null && flowRunId != null) { - return userId + File.separator + flowName + File.separator + flowRunId; + return userId + File.separator + flowName + File.separator + "*" + File.separator + flowRunId; } if (clusterId == null || appId == null) { throw new IOException("Unable to get flow info"); @@ -186,7 +186,7 @@ private String getFlowRunPath(String userId, String clusterId, continue; } return record.get(1).trim() + File.separator + record.get(2).trim() + - File.separator + record.get(3).trim(); + File.separator + "*" + File.separator + record.get(3).trim(); } parser.close(); } @@ -286,6 +286,7 @@ public int compare(Long l1, Long l2) { } } ); + dir = getNormalPath(dir); if (dir != null) { RemoteIterator fileStatuses = fs.listFiles(dir, false); @@ -394,9 +395,11 @@ public TimelineEntity getEntity(TimelineReaderContext context, Path flowRunPath = new Path(clusterIdPath, flowRunPathStr); Path appIdPath = new Path(flowRunPath, context.getAppId()); Path entityTypePath = new Path(appIdPath, context.getEntityType()); - Path entityFilePath = new Path(entityTypePath, - context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION); - + Path entityFilePath = getNormalPath(new Path(entityTypePath, + context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION)); + if (entityFilePath == null) { + return null; + } try (BufferedReader reader = new BufferedReader(new InputStreamReader( fs.open(entityFilePath), Charset.forName("UTF-8")))) { @@ -410,6 +413,14 @@ public TimelineEntity getEntity(TimelineReaderContext context, } } + private Path getNormalPath(Path globPath) throws IOException { + FileStatus[] status = fs.globStatus(globPath); + if (status == null || status.length < 1) { + LOG.info("{} do not exist.", globPath); + return null; + } + return status[0].getPath(); + } @Override public Set getEntities(TimelineReaderContext context, TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) @@ -433,13 +444,13 @@ public Set getEntities(TimelineReaderContext context, context.getClusterId(), context.getFlowName(), context.getFlowRunId(), context.getAppId()); if (context.getUserId() == null) { - context.setUserId(new Path(flowRunPathStr).getParent().getParent(). + context.setUserId(new Path(flowRunPathStr).getParent().getParent().getParent(). getName()); } Path clusterIdPath = new Path(entitiesPath, context.getClusterId()); Path flowRunPath = new Path(clusterIdPath, flowRunPathStr); Path appIdPath = new Path(flowRunPath, context.getAppId()); - FileStatus[] fileStatuses = fs.listStatus(appIdPath); + FileStatus[] fileStatuses = fs.listStatus(getNormalPath(appIdPath)); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isDirectory()) { result.add(fileStatus.getPath().getName()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java index cf94749e88..47e5a49051 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java @@ -66,6 +66,11 @@ public class TestFileSystemTimelineReaderImpl { private static final String ROOT_DIR = new File("target", TestFileSystemTimelineReaderImpl.class.getSimpleName()).getAbsolutePath(); + private static String cluster = "cluster1"; + private static String user = "user1"; + private static String flowVersion = "v1"; + private static String flowRunId = "1"; + private FileSystemTimelineReaderImpl reader; @BeforeAll @@ -125,7 +130,7 @@ private static void writeEntityFile(TimelineEntity entity, File dir) private static void loadEntityData(String rootDir) throws Exception { File appDir = - getAppDir(rootDir, "cluster1", "user1", "flow1", "1", "app1", "app"); + getAppDir(rootDir, "flow1", "app1", "app"); TimelineEntity entity11 = new TimelineEntity(); entity11.setId("id_1"); entity11.setType("app"); @@ -266,8 +271,9 @@ private static void loadEntityData(String rootDir) throws Exception { entity4.addEvent(event44); writeEntityFile(entity4, appDir); - File attemptDir = getAppDir(rootDir, "cluster1", "user1", "flow1", "1", - "app1", TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString()); + + File attemptDir = getAppDir(rootDir, "flow1", "app1", + TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString()); ApplicationAttemptEntity attempt1 = new ApplicationAttemptEntity(); attempt1.setId("app-attempt-1"); attempt1.setCreatedTime(1425017502003L); @@ -277,8 +283,8 @@ private static void loadEntityData(String rootDir) throws Exception { attempt2.setCreatedTime(1425017502004L); writeEntityFile(attempt2, attemptDir); - File entityDir = getAppDir(rootDir, "cluster1", "user1", "flow1", "1", - "app1", TimelineEntityType.YARN_CONTAINER.toString()); + File entityDir = getAppDir(rootDir, "flow1", "app1", + TimelineEntityType.YARN_CONTAINER.toString()); ContainerEntity containerEntity1 = new ContainerEntity(); containerEntity1.setId("container_1_1"); containerEntity1.setParent(attempt1.getIdentifier()); @@ -298,8 +304,7 @@ private static void loadEntityData(String rootDir) throws Exception { writeEntityFile(containerEntity3, entityDir); File appDir2 = - getAppDir(rootDir, "cluster1", "user1", "flow1,flow", "1", "app2", - "app"); + getAppDir(rootDir, "flow1,flow", "app2", "app"); TimelineEntity entity5 = new TimelineEntity(); entity5.setId("id_5"); entity5.setType("app"); @@ -307,10 +312,9 @@ private static void loadEntityData(String rootDir) throws Exception { writeEntityFile(entity5, appDir2); } - private static File getAppDir(String rootDir, String cluster, String user, - String flowName, String flowRunId, String appId, String entityName) { + private static File getAppDir(String rootDir, String flowName, String appId, String entityName) { return new File(rootDir + File.separator + "entities" + File.separator + - cluster + File.separator + user + File.separator + flowName + + cluster + File.separator + user + File.separator + flowName + File.separator + flowVersion + File.separator + flowRunId + File.separator + appId + File.separator + entityName + File.separator); }