From fb68980959f95f0d89e86f91909867724ad01791 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 15 Jun 2017 14:46:55 -0700 Subject: [PATCH] HDFS-10480. Add an admin command to list currently open files. Contributed by Manoj Govindassamy. --- .../org/apache/hadoop/hdfs/DFSClient.java | 12 + .../hadoop/hdfs/DistributedFileSystem.java | 15 ++ .../apache/hadoop/hdfs/client/HdfsAdmin.java | 15 ++ .../hadoop/hdfs/protocol/ClientProtocol.java | 12 + .../hadoop/hdfs/protocol/OpenFileEntry.java | 58 +++++ .../hdfs/protocol/OpenFilesIterator.java | 59 +++++ .../ClientNamenodeProtocolTranslatorPB.java | 23 ++ .../hdfs/protocolPB/PBHelperClient.java | 19 +- .../main/proto/ClientNamenodeProtocol.proto | 18 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + ...amenodeProtocolServerSideTranslatorPB.java | 20 ++ .../hdfs/server/namenode/FSNamesystem.java | 45 ++++ .../hdfs/server/namenode/LeaseManager.java | 51 +++- .../server/namenode/NameNodeRpcServer.java | 8 + .../apache/hadoop/hdfs/tools/DFSAdmin.java | 60 +++++ .../src/main/resources/hdfs-default.xml | 10 + .../src/site/markdown/HDFSCommands.md | 2 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 40 ++- .../org/apache/hadoop/hdfs/TestHdfsAdmin.java | 59 +++++ .../server/namenode/TestLeaseManager.java | 12 +- .../server/namenode/TestListOpenFiles.java | 234 ++++++++++++++++++ .../hadoop/hdfs/tools/TestDFSAdmin.java | 72 ++++++ 22 files changed, 841 insertions(+), 7 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index ec142f0a77..51f04e0359 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -128,6 +128,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; @@ -3025,4 +3027,14 @@ public ErasureCodingPolicy getErasureCodingPolicy(String src) Tracer getTracer() { return tracer; } + + /** + * Get a remote iterator to the open files list managed by NameNode. + * + * @throws IOException + */ + public RemoteIterator listOpenFiles() throws IOException { + checkOpen(); + return new OpenFilesIterator(namenode, tracer); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 2f60e9d103..1fd8f79439 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -2881,4 +2882,18 @@ public HdfsDataOutputStream build() throws IOException { public HdfsDataOutputStreamBuilder createFile(Path path) { return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true); } + + /** + * Returns a RemoteIterator which can be used to list all open files + * currently managed by the NameNode. For large numbers of open files, + * iterator will fetch the list in batches of configured size. + *

+ * Since the list is fetched in batches, it does not represent a + * consistent snapshot of the all open files. + *

+ * This method can only be called by HDFS superusers. + */ + public RemoteIterator listOpenFiles() throws IOException { + return dfs.listOpenFiles(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index 71f6a35b10..21de0abbe9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -560,4 +561,18 @@ private void provisionEZTrash(Path path) throws IOException { dfs.setPermission(trashPath, TRASH_PERMISSION); } + /** + * Returns a RemoteIterator which can be used to list all open files + * currently managed by the NameNode. For large numbers of open files, + * iterator will fetch the list in batches of configured size. + *

+ * Since the list is fetched in batches, it does not represent a + * consistent snapshot of the all open files. + *

+ * This method can only be called by HDFS superusers. + */ + public RemoteIterator listOpenFiles() throws IOException { + return dfs.listOpenFiles(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 82e5c323aa..e132e048ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1606,4 +1606,16 @@ AddECPolicyResponse[] addErasureCodingPolicies( */ @Idempotent QuotaUsage getQuotaUsage(String path) throws IOException; + + /** + * List open files in the system in batches. INode id is the cursor and the + * open files returned in a batch will have their INode ids greater than + * the cursor INode id. Open files can only be requested by super user and + * the the list across batches are not atomic. + * + * @param prevId the cursor INode id. + * @throws IOException + */ + @Idempotent + BatchedEntries listOpenFiles(long prevId) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java new file mode 100644 index 0000000000..14e97d321b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An open file entry for use by DFSAdmin commands. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class OpenFileEntry { + private final long id; + private final String filePath; + private final String clientName; + private final String clientMachine; + + public OpenFileEntry(long id, String filePath, + String clientName, String clientMachine) { + this.id = id; + this.filePath = filePath; + this.clientName = clientName; + this.clientMachine = clientMachine; + } + + public long getId() { + return id; + } + + public String getFilePath() { + return filePath; + } + + public String getClientMachine() { + return clientMachine; + } + + public String getClientName() { + return clientName; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java new file mode 100644 index 0000000000..c24e58591e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; + +/** + * OpenFilesIterator is a remote iterator that iterates over the open files list + * managed by the NameNode. Since the list is retrieved in batches, it does not + * represent a consistent view of all open files. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class OpenFilesIterator extends + BatchedRemoteIterator { + private final ClientProtocol namenode; + private final Tracer tracer; + + public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) { + super(HdfsConstants.GRANDFATHER_INODE_ID); + this.namenode = namenode; + this.tracer = tracer; + } + + @Override + public BatchedEntries makeRequest(Long prevId) + throws IOException { + try (TraceScope ignored = tracer.newScope("listOpenFiles")) { + return namenode.listOpenFiles(prevId); + } + } + + @Override + public Long elementToPrevKey(OpenFileEntry entry) { + return entry.getId(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index f29de15488..0d517f80f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.BlocksStats; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -139,10 +140,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto; @@ -1752,4 +1756,23 @@ public QuotaUsage getQuotaUsage(String path) throws IOException { throw ProtobufHelper.getRemoteException(e); } } + + @Override + public BatchedEntries listOpenFiles(long prevId) + throws IOException { + ListOpenFilesRequestProto req = + ListOpenFilesRequestProto.newBuilder().setId(prevId).build(); + try { + ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req); + List openFileEntries = + Lists.newArrayListWithCapacity(response.getEntriesCount()); + for (OpenFilesBatchResponseProto p : response.getEntriesList()) { + openFileEntries.add(PBHelperClient.convert(p)); + } + return new BatchedListEntries<>(openFileEntries, response.getHasMore()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 1716fba684..b356583881 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.BlocksStats; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -120,6 +121,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupsStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsBlocksStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; @@ -1253,6 +1255,21 @@ public static EncryptionZone convert(EncryptionZoneProto proto) { proto.getKeyName()); } + public static OpenFilesBatchResponseProto convert(OpenFileEntry + openFileEntry) { + return OpenFilesBatchResponseProto.newBuilder() + .setId(openFileEntry.getId()) + .setPath(openFileEntry.getFilePath()) + .setClientName(openFileEntry.getClientName()) + .setClientMachine(openFileEntry.getClientMachine()) + .build(); + } + + public static OpenFileEntry convert(OpenFilesBatchResponseProto proto) { + return new OpenFileEntry(proto.getId(), proto.getPath(), + proto.getClientName(), proto.getClientMachine()); + } + public static AclStatus convert(GetAclStatusResponseProto e) { AclStatusProto r = e.getResult(); AclStatus.Builder builder = new AclStatus.Builder(); @@ -2826,4 +2843,4 @@ public static List convertAddBlockFlags( } return ret; } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 3b1504c495..c56c0b1b27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -777,6 +777,22 @@ message GetEditsFromTxidResponseProto { required EventsListProto eventsList = 1; } +message ListOpenFilesRequestProto { + required int64 id = 1; +} + +message OpenFilesBatchResponseProto { + required int64 id = 1; + required string path = 2; + required string clientName = 3; + required string clientMachine = 4; +} + +message ListOpenFilesResponseProto { + repeated OpenFilesBatchResponseProto entries = 1; + required bool hasMore = 2; +} + service ClientNamenodeProtocol { rpc getBlockLocations(GetBlockLocationsRequestProto) returns(GetBlockLocationsResponseProto); @@ -945,4 +961,6 @@ service ClientNamenodeProtocol { returns(GetErasureCodingCodecsResponseProto); rpc getQuotaUsage(GetQuotaUsageRequestProto) returns(GetQuotaUsageResponseProto); + rpc listOpenFiles(ListOpenFilesRequestProto) + returns(ListOpenFilesResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 726cfb7411..eaaff6071d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -873,6 +873,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY; public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; + public static final String DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES = + "dfs.namenode.list.openfiles.num.responses"; + public static final int DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT = + 1000; public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms"; public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000; public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index ba59ed8df9..7135ff16da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -155,6 +156,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; @@ -1717,4 +1720,21 @@ public GetQuotaUsageResponseProto getQuotaUsage( throw new ServiceException(e); } } + + @Override + public ListOpenFilesResponseProto listOpenFiles(RpcController controller, + ListOpenFilesRequestProto req) throws ServiceException { + try { + BatchedEntries entries = server.listOpenFiles(req.getId()); + ListOpenFilesResponseProto.Builder builder = + ListOpenFilesResponseProto.newBuilder(); + builder.setHasMore(entries.hasMore()); + for (int i = 0; i < entries.size(); i++) { + builder.addEntries(PBHelperClient.convert(entries.get(i))); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 2a611b39ee..3f7f1ca533 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hdfs.protocol.BlocksStats; import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.monotonicNow; @@ -424,6 +425,9 @@ private void logAuditEvent(boolean succeeded, /** Maximum time the lock is hold to release lease. */ private final long maxLockHoldToReleaseLeaseMs; + // Batch size for open files response + private final int maxListOpenFilesResponses; + // Scan interval is not configurable. private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); @@ -874,6 +878,14 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf); LOG.info("Using INode attribute provider: " + klass.getName()); } + this.maxListOpenFilesResponses = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT + ); + Preconditions.checkArgument(maxListOpenFilesResponses > 0, + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES + + " must be a positive integer." + ); } catch(IOException e) { LOG.error(getClass().getSimpleName() + " initialization failed.", e); close(); @@ -905,6 +917,10 @@ public long getMaxLockHoldToReleaseLeaseMs() { return maxLockHoldToReleaseLeaseMs; } + public int getMaxListOpenFilesResponses() { + return maxListOpenFilesResponses; + } + void lockRetryCache() { if (retryCache != null) { retryCache.lock(); @@ -1714,6 +1730,35 @@ private void metaSave(PrintWriter out) { blockManager.metaSave(out); } + /** + * List open files in the system in batches. prevId is the cursor INode id and + * the open files returned in a batch will have their INode ids greater than + * this cursor. Open files can only be requested by super user and the the + * list across batches does not represent a consistent view of all open files. + * + * @param prevId the cursor INode id. + * @throws IOException + */ + BatchedListEntries listOpenFiles(long prevId) + throws IOException { + final String operationName = "listOpenFiles"; + checkSuperuserPrivilege(); + checkOperation(OperationCategory.READ); + readLock(); + BatchedListEntries batchedListEntries; + try { + checkOperation(OperationCategory.READ); + batchedListEntries = leaseManager.getUnderConstructionFiles(prevId); + } catch (AccessControlException e) { + logAuditEvent(false, operationName, null); + throw e; + } finally { + readUnlock(operationName); + } + logAuditEvent(true, operationName, null); + return batchedListEntries; + } + private String metaSaveAsString() { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 8695d63e2f..38cdbb30ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; @@ -40,7 +39,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.util.Daemon; @@ -94,7 +95,7 @@ public int compare(Lease o1, Lease o2) { } }); // INodeID -> Lease - private final HashMap leasesById = new HashMap<>(); + private final TreeMap leasesById = new TreeMap<>(); private Daemon lmthread; private volatile boolean shouldRunMonitor; @@ -245,6 +246,52 @@ public List call() { return iipSet; } + /** + * Get a batch of under construction files from the currently active leases. + * File INodeID is the cursor used to fetch new batch of results and the + * batch size is configurable using below config param. Since the list is + * fetched in batches, it does not represent a consistent view of all + * open files. + * + * @see org.apache.hadoop.hdfs.DFSConfigKeys#DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES + * @param prevId the INodeID cursor + * @throws IOException + */ + public BatchedListEntries getUnderConstructionFiles( + final long prevId) throws IOException { + assert fsnamesystem.hasReadLock(); + SortedMap remainingLeases; + synchronized (this) { + remainingLeases = leasesById.tailMap(prevId, false); + } + Collection inodeIds = remainingLeases.keySet(); + final int numResponses = Math.min( + this.fsnamesystem.getMaxListOpenFilesResponses(), inodeIds.size()); + final List openFileEntries = + Lists.newArrayListWithExpectedSize(numResponses); + + int count = 0; + for (Long inodeId: inodeIds) { + final INodeFile inodeFile = + fsnamesystem.getFSDirectory().getInode(inodeId).asFile(); + if (!inodeFile.isUnderConstruction()) { + LOG.warn("The file " + inodeFile.getFullPathName() + + " is not under construction but has lease."); + continue; + } + openFileEntries.add(new OpenFileEntry( + inodeFile.getId(), inodeFile.getFullPathName(), + inodeFile.getFileUnderConstructionFeature().getClientName(), + inodeFile.getFileUnderConstructionFeature().getClientMachine())); + count++; + if (count >= numResponses) { + break; + } + } + boolean hasMore = (numResponses < remainingLeases.size()); + return new BatchedListEntries<>(openFileEntries, hasMore); + } + /** @return the lease containing src */ public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index fff29df3c2..e11a5467f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -1308,6 +1309,13 @@ public void metaSave(String filename) throws IOException { namesystem.metaSave(filename); } + @Override // ClientProtocol + public BatchedEntries listOpenFiles(long prevId) + throws IOException { + checkNNStartup(); + return namesystem.listOpenFiles(prevId); + } + @Override // ClientProtocol public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index d82dfc4059..70509d4c40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -49,7 +49,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.shell.Command; import org.apache.hadoop.fs.shell.CommandFormat; import org.apache.hadoop.fs.shell.PathData; @@ -73,6 +75,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotException; @@ -455,6 +459,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep "\t[-getDatanodeInfo ]\n" + "\t[-metasave filename]\n" + "\t[-triggerBlockReport [-incremental] ]\n" + + "\t[-listOpenFiles]\n" + "\t[-help [cmd]]\n"; /** @@ -881,6 +886,45 @@ public int refreshNodes() throws IOException { return exitCode; } + /** + * Command to list all the open files currently managed by NameNode. + * Usage: hdfs dfsadmin -listOpenFiles + * + * @throws IOException + */ + public int listOpenFiles() throws IOException { + DistributedFileSystem dfs = getDFS(); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri); + + RemoteIterator openFilesRemoteIterator; + if (isHaEnabled) { + ProxyAndInfo proxy = NameNodeProxies.createNonHAProxy( + dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class, + UserGroupInformation.getCurrentUser(), false); + openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(), + FsTracer.get(dfsConf)); + } else { + openFilesRemoteIterator = dfs.listOpenFiles(); + } + printOpenFiles(openFilesRemoteIterator); + return 0; + } + + private void printOpenFiles(RemoteIterator openFilesIterator) + throws IOException { + System.out.println(String.format("%-20s\t%-20s\t%s", "Client Host", + "Client Name", "Open File Path")); + while (openFilesIterator.hasNext()) { + OpenFileEntry openFileEntry = openFilesIterator.next(); + System.out.println(String.format("%-20s\t%-20s\t%20s", + openFileEntry.getClientMachine(), + openFileEntry.getClientName(), + openFileEntry.getFilePath())); + } + } + /** * Command to ask the namenode to set the balancer bandwidth for all of the * datanodes. @@ -1138,6 +1182,10 @@ private void printHelp(String cmd) { + "\tIf 'incremental' is specified, it will be an incremental\n" + "\tblock report; otherwise, it will be a full block report.\n"; + String listOpenFiles = "-listOpenFiles\n" + + "\tList all open files currently managed by the NameNode along\n" + + "\twith client name and client machine accessing them.\n"; + String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -1203,6 +1251,8 @@ private void printHelp(String cmd) { System.out.println(evictWriters); } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) { System.out.println(getDatanodeInfo); + } else if ("listOpenFiles".equalsIgnoreCase(cmd)) { + System.out.println(listOpenFiles); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -1238,6 +1288,7 @@ private void printHelp(String cmd) { System.out.println(evictWriters); System.out.println(getDatanodeInfo); System.out.println(triggerBlockReport); + System.out.println(listOpenFiles); System.out.println(help); System.out.println(); ToolRunner.printGenericCommandUsage(System.out); @@ -1879,6 +1930,8 @@ private static void printUsage(String cmd) { } else if ("-triggerBlockReport".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-triggerBlockReport [-incremental] ]"); + } else if ("-listOpenFiles".equals(cmd)) { + System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]"); } else { System.err.println("Usage: hdfs dfsadmin"); System.err.println("Note: Administrative commands can only be run as the HDFS superuser."); @@ -2032,6 +2085,11 @@ public int run(String[] argv) throws Exception { printUsage(cmd); return exitCode; } + } else if ("-listOpenFiles".equals(cmd)) { + if (argv.length != 1) { + printUsage(cmd); + return exitCode; + } } // initialize DFSAdmin @@ -2113,6 +2171,8 @@ public int run(String[] argv) throws Exception { exitCode = reconfig(argv, i); } else if ("-triggerBlockReport".equals(cmd)) { exitCode = triggerBlockReport(argv); + } else if ("-listOpenFiles".equals(cmd)) { + exitCode = listOpenFiles(); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 82090e6113..dbf78fcf30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2788,6 +2788,16 @@ + + dfs.namenode.list.openfiles.num.responses + 1000 + + When listing open files, the maximum number of open files that will be + returned in a single batch. Fetching the list incrementally in batches + improves namenode performance. + + + dfs.namenode.edekcacheloader.interval.ms 1000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index ffffee9d3a..be0f89e39f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -370,6 +370,7 @@ Usage: hdfs dfsadmin [-getDatanodeInfo ] hdfs dfsadmin [-metasave filename] hdfs dfsadmin [-triggerBlockReport [-incremental] ] + hdfs dfsadmin [-listOpenFiles] hdfs dfsadmin [-help [cmd]] | COMMAND\_OPTION | Description | @@ -406,6 +407,7 @@ Usage: | `-getDatanodeInfo` \ | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. | | `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following
1. Datanodes heart beating with Namenode
2. Blocks waiting to be replicated
3. Blocks currently being replicated
4. Blocks waiting to be deleted | | `-triggerBlockReport` `[-incremental]` \ | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. | +| `-listOpenFiles` | List all open files currently managed by the NameNode along with client name and client machine accessing them. | | `-help` [cmd] | Displays help for the given command or all commands if none is specified. | Runs a HDFS dfsadmin client. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 5075c05d47..f3572ff1b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -62,8 +62,10 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -1863,8 +1865,8 @@ public Boolean get() { }, 100, waitTime); } - /** - * Change the length of a block at datanode dnIndex + /** + * Change the length of a block at datanode dnIndex. */ public static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException { @@ -2249,4 +2251,38 @@ public static void verifyDelete(FsShell shell, FileSystem fs, Path path, assertFalse("File in trash : " + trashPath, fs.exists(trashPath)); } } + + public static Map createOpenFiles(FileSystem fs, + String filePrefix, int numFilesToCreate) throws IOException { + final Map filesCreated = new HashMap<>(); + final byte[] buffer = new byte[(int) (1024 * 1.75)]; + final Random rand = new Random(0xFEED0BACL); + for (int i = 0; i < numFilesToCreate; i++) { + Path file = new Path("/" + filePrefix + "-" + i); + FSDataOutputStream stm = fs.create(file, true, 1024, (short) 1, 1024); + rand.nextBytes(buffer); + stm.write(buffer); + filesCreated.put(file, stm); + } + return filesCreated; + } + + public static HashSet closeOpenFiles( + HashMap openFilesMap, + int numFilesToClose) throws IOException { + HashSet closedFiles = new HashSet<>(); + for (Iterator> it = + openFilesMap.entrySet().iterator(); it.hasNext();) { + Entry entry = it.next(); + LOG.info("Closing file: " + entry.getKey()); + entry.getValue().close(); + closedFiles.add(entry.getKey()); + it.remove(); + numFilesToClose--; + if (numFilesToClose == 0) { + break; + } + } + return closedFiles; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java index fe20c68e06..685ea8b501 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -31,11 +33,14 @@ import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.junit.After; import org.junit.Assert; @@ -49,11 +54,15 @@ public class TestHdfsAdmin { private static final Path TEST_PATH = new Path("/test"); private static final short REPL = 1; private static final int SIZE = 128; + private static final int OPEN_FILES_BATCH_SIZE = 5; private final Configuration conf = new Configuration(); private MiniDFSCluster cluster; @Before public void setUpCluster() throws IOException { + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, + OPEN_FILES_BATCH_SIZE); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.waitActive(); } @@ -205,4 +214,54 @@ public void testGetKeyProvider() throws IOException { Assert.assertNotNull("should not return null for an encrypted cluster", hdfsAdmin.getKeyProvider()); } + + @Test(timeout = 120000L) + public void testListOpenFiles() throws IOException { + HashSet closedFileSet = new HashSet<>(); + HashMap openFileMap = new HashMap<>(); + FileSystem fs = FileSystem.get(conf); + verifyOpenFiles(closedFileSet, openFileMap); + + int numClosedFiles = OPEN_FILES_BATCH_SIZE * 4; + int numOpenFiles = (OPEN_FILES_BATCH_SIZE * 3) + 1; + for (int i = 0; i < numClosedFiles; i++) { + Path filePath = new Path("/closed-file-" + i); + DFSTestUtil.createFile(fs, filePath, SIZE, REPL, 0); + closedFileSet.add(filePath); + } + verifyOpenFiles(closedFileSet, openFileMap); + + openFileMap.putAll( + DFSTestUtil.createOpenFiles(fs, "open-file-1", numOpenFiles)); + verifyOpenFiles(closedFileSet, openFileMap); + + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap, + openFileMap.size() / 2)); + verifyOpenFiles(closedFileSet, openFileMap); + + openFileMap.putAll( + DFSTestUtil.createOpenFiles(fs, "open-file-2", 10)); + verifyOpenFiles(closedFileSet, openFileMap); + + while(openFileMap.size() > 0) { + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap, 1)); + verifyOpenFiles(closedFileSet, openFileMap); + } + } + + private void verifyOpenFiles(HashSet closedFiles, + HashMap openFileMap) throws IOException { + HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); + HashSet openFiles = new HashSet<>(openFileMap.keySet()); + RemoteIterator openFilesRemoteItr = + hdfsAdmin.listOpenFiles(); + while (openFilesRemoteItr.hasNext()) { + String filePath = openFilesRemoteItr.next().getFilePath(); + assertFalse(filePath + " should not be listed under open files!", + closedFiles.contains(filePath)); + assertTrue(filePath + " is not listed under open files!", + openFiles.remove(new Path(filePath))); + } + assertTrue("Not all open files are listed!", openFiles.isEmpty()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java index 74752f90dc..55bc7c3bf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java @@ -40,6 +40,7 @@ import org.junit.Test; import org.junit.rules.Timeout; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -190,6 +191,7 @@ public void testLeaseRestorationOnRestart() throws Exception { @Test (timeout = 60000) public void testInodeWithLeases() throws Exception { FSNamesystem fsNamesystem = makeMockFsNameSystem(); + when(fsNamesystem.getMaxListOpenFilesResponses()).thenReturn(1024); FSDirectory fsDirectory = fsNamesystem.getFSDirectory(); LeaseManager lm = new LeaseManager(fsNamesystem); Set iNodeIds = new HashSet<>(Arrays.asList( @@ -208,6 +210,7 @@ public void testInodeWithLeases() throws Exception { for (Long iNodeId : iNodeIds) { INodeFile iNodeFile = stubInodeFile(iNodeId); + iNodeFile.toUnderConstruction("hbase", "gce-100"); iNodeFile.setParent(rootInodeDirectory); when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); lm.addLease("holder_" + iNodeId, iNodeId); @@ -230,6 +233,7 @@ public void testInodeWithLeases() throws Exception { @Test (timeout = 240000) public void testInodeWithLeasesAtScale() throws Exception { FSNamesystem fsNamesystem = makeMockFsNameSystem(); + when(fsNamesystem.getMaxListOpenFilesResponses()).thenReturn(4096); FSDirectory fsDirectory = fsNamesystem.getFSDirectory(); LeaseManager lm = new LeaseManager(fsNamesystem); @@ -275,7 +279,7 @@ public void testInodeWithLeasesAtScale() throws Exception { private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager, final FSDirectory fsDirectory, INodeDirectory ancestorDirectory, - int scale) { + int scale) throws IOException { verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0); Set iNodeIds = new HashSet<>(); @@ -284,6 +288,7 @@ private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager, } for (Long iNodeId : iNodeIds) { INodeFile iNodeFile = stubInodeFile(iNodeId); + iNodeFile.toUnderConstruction("hbase", "gce-100"); iNodeFile.setParent(ancestorDirectory); when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); leaseManager.addLease("holder_" + iNodeId, iNodeId); @@ -386,13 +391,16 @@ public void testInodeWithLeasesForAncestorDir() throws Exception { private void verifyINodeLeaseCounts(final LeaseManager leaseManager, INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount, - int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) { + int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) + throws IOException { assertEquals(iNodeIdWithLeaseCount, leaseManager.getINodeIdWithLeases().size()); assertEquals(iNodeWithLeaseCount, leaseManager.getINodeWithLeases().size()); assertEquals(iNodeUnderAncestorLeaseCount, leaseManager.getINodeWithLeases(ancestorDirectory).size()); + assertEquals(iNodeIdWithLeaseCount, + leaseManager.getUnderConstructionFiles(0).size()); } private Map createINodeTree(INodeDirectory parentDir, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java new file mode 100644 index 0000000000..b29019437e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Verify open files listing. + */ +public class TestListOpenFiles { + private static final int NUM_DATA_NODES = 3; + private static final int BATCH_SIZE = 5; + private static MiniDFSCluster cluster = null; + private static DistributedFileSystem fs = null; + private static NamenodeProtocols nnRpc = null; + private static final Log LOG = LogFactory.getLog(TestListOpenFiles.class); + + @Before + public void setUp() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE); + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(NUM_DATA_NODES).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + nnRpc = cluster.getNameNodeRpc(); + } + + @After + public void tearDown() throws IOException { + if (fs != null) { + fs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 120000L) + public void testListOpenFilesViaNameNodeRPC() throws Exception { + HashMap openFiles = new HashMap<>(); + createFiles(fs, "closed", 10); + verifyOpenFiles(openFiles); + + BatchedEntries openFileEntryBatchedEntries = + nnRpc.listOpenFiles(0); + assertTrue("Open files list should be empty!", + openFileEntryBatchedEntries.size() == 0); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-1", 1)); + verifyOpenFiles(openFiles); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-2", + (BATCH_SIZE * 2 + BATCH_SIZE / 2))); + verifyOpenFiles(openFiles); + + DFSTestUtil.closeOpenFiles(openFiles, openFiles.size() / 2); + verifyOpenFiles(openFiles); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-3", (BATCH_SIZE * 5))); + verifyOpenFiles(openFiles); + + while(openFiles.size() > 0) { + DFSTestUtil.closeOpenFiles(openFiles, 1); + verifyOpenFiles(openFiles); + } + } + + private void verifyOpenFiles(Map openFiles) + throws IOException { + HashSet remainingFiles = new HashSet<>(openFiles.keySet()); + OpenFileEntry lastEntry = null; + BatchedEntries batchedEntries; + do { + if (lastEntry == null) { + batchedEntries = nnRpc.listOpenFiles(0); + } else { + batchedEntries = nnRpc.listOpenFiles(lastEntry.getId()); + } + assertTrue("Incorrect open files list size!", + batchedEntries.size() <= BATCH_SIZE); + for (int i = 0; i < batchedEntries.size(); i++) { + lastEntry = batchedEntries.get(i); + String filePath = lastEntry.getFilePath(); + LOG.info("OpenFile: " + filePath); + assertTrue("Unexpected open file: " + filePath, + remainingFiles.remove(new Path(filePath))); + } + } while (batchedEntries.hasMore()); + assertTrue(remainingFiles.size() + " open files not listed!", + remainingFiles.size() == 0); + } + + private Set createFiles(FileSystem fileSystem, String fileNamePrefix, + int numFilesToCreate) throws IOException { + HashSet files = new HashSet<>(); + for (int i = 0; i < numFilesToCreate; i++) { + Path filePath = new Path(fileNamePrefix + "-" + i); + DFSTestUtil.createFile(fileSystem, filePath, 1024, (short) 3, 1); + } + return files; + } + + /** + * Verify dfsadmin -listOpenFiles command in HA mode. + */ + @Test(timeout = 120000) + public void testListOpenFilesInHA() throws Exception { + fs.close(); + cluster.shutdown(); + HdfsConfiguration haConf = new HdfsConfiguration(); + haConf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE); + MiniDFSCluster haCluster = + new MiniDFSCluster.Builder(haConf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + try { + HATestUtil.setFailoverConfigurations(haCluster, haConf); + FileSystem fileSystem = HATestUtil.configureFailoverFs(haCluster, haConf); + + List namenodes = + HAUtil.getProxiesForAllNameNodesInNameservice(haConf, + HATestUtil.getLogicalHostname(haCluster)); + haCluster.transitionToActive(0); + assertTrue(HAUtil.isAtLeastOneActive(namenodes)); + + final byte[] data = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + DFSTestUtil.createOpenFiles(fileSystem, "ha-open-file", + ((BATCH_SIZE * 4) + (BATCH_SIZE / 2))); + + final DFSAdmin dfsAdmin = new DFSAdmin(haConf); + final AtomicBoolean failoverCompleted = new AtomicBoolean(false); + final AtomicBoolean listOpenFilesError = new AtomicBoolean(false); + final int listingIntervalMsec = 250; + Thread clientThread = new Thread(new Runnable() { + @Override + public void run() { + while(!failoverCompleted.get()) { + try { + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles"})); + // Sleep for some time to avoid + // flooding logs with listing. + Thread.sleep(listingIntervalMsec); + } catch (Exception e) { + listOpenFilesError.set(true); + LOG.info("Error listing open files: ", e); + break; + } + } + } + }); + clientThread.start(); + + // Let client list open files for few + // times before the NN failover. + Thread.sleep(listingIntervalMsec * 2); + + LOG.info("Shutting down Active NN0!"); + haCluster.shutdownNameNode(0); + LOG.info("Transitioning NN1 to Active!"); + haCluster.transitionToActive(1); + failoverCompleted.set(true); + + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles"})); + assertFalse("Client Error!", listOpenFilesError.get()); + + clientThread.join(); + } finally { + if (haCluster != null) { + haCluster.shutdown(); + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index a23fe81ada..2ef45e7b38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationUtil; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSClient; @@ -60,6 +61,8 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Scanner; import java.util.concurrent.TimeoutException; @@ -593,6 +596,75 @@ public Boolean get() { } } + @Test(timeout = 300000L) + public void testListOpenFiles() throws Exception { + redirectStream(); + + final Configuration dfsConf = new HdfsConfiguration(); + dfsConf.setInt( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); + dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + dfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 5); + final Path baseDir = new Path( + PathUtils.getTestDir(getClass()).getAbsolutePath(), + GenericTestUtils.getMethodName()); + dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString()); + + final int numDataNodes = 3; + final int numClosedFiles = 25; + final int numOpenFiles = 15; + + try(MiniDFSCluster miniCluster = new MiniDFSCluster + .Builder(dfsConf) + .numDataNodes(numDataNodes).build()) { + final short replFactor = 1; + final long fileLength = 512L; + final FileSystem fs = miniCluster.getFileSystem(); + final Path parentDir = new Path("/tmp/files/"); + + fs.mkdirs(parentDir); + HashSet closedFileSet = new HashSet<>(); + for (int i = 0; i < numClosedFiles; i++) { + Path file = new Path(parentDir, "closed-file-" + i); + DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); + closedFileSet.add(file); + } + + HashMap openFilesMap = new HashMap<>(); + for (int i = 0; i < numOpenFiles; i++) { + Path file = new Path(parentDir, "open-file-" + i); + DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); + FSDataOutputStream outputStream = fs.append(file); + openFilesMap.put(file, outputStream); + } + + final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-listOpenFiles"})); + verifyOpenFilesListing(closedFileSet, openFilesMap); + + for (int count = 0; count < numOpenFiles; count++) { + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFilesMap, 1)); + resetStream(); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-listOpenFiles"})); + verifyOpenFilesListing(closedFileSet, openFilesMap); + } + } + } + + private void verifyOpenFilesListing(HashSet closedFileSet, + HashMap openFilesMap) { + final String outStr = scanIntoString(out); + LOG.info("dfsadmin -listOpenFiles output: \n" + out); + for (Path closedFilePath : closedFileSet) { + assertThat(outStr, not(containsString(closedFilePath.toString() + "\n"))); + } + for (Path openFilePath : openFilesMap.keySet()) { + assertThat(outStr, is(containsString(openFilePath.toString() + "\n"))); + } + } + private void verifyNodesAndCorruptBlocks( final int numDn, final int numLiveDn,