HDFS-5555. CacheAdmin commands fail when first listed NameNode is in Standby (jxiang via cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547895 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-12-04 20:06:25 +00:00
parent 59a2139093
commit f791e291ca
10 changed files with 250 additions and 165 deletions

View File

@ -421,6 +421,9 @@ Trunk (Unreleased)
HDFS-5562. TestCacheDirectives and TestFsDatasetCache should stub out HDFS-5562. TestCacheDirectives and TestFsDatasetCache should stub out
native mlock. (Colin McCabe and Akira Ajisaka via wang) native mlock. (Colin McCabe and Akira Ajisaka via wang)
HDFS-5555. CacheAdmin commands fail when first listed NameNode is in
Standby (jxiang via cmccabe)
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -109,8 +109,10 @@
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolIterator;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@ -2324,12 +2326,7 @@ public void removeCacheDirective(long id)
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
CacheDirectiveInfo filter) throws IOException { CacheDirectiveInfo filter) throws IOException {
checkOpen(); return new CacheDirectiveIterator(namenode, filter);
try {
return namenode.listCacheDirectives(0, filter);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
} }
public void addCachePool(CachePoolInfo info) throws IOException { public void addCachePool(CachePoolInfo info) throws IOException {
@ -2360,12 +2357,7 @@ public void removeCachePool(String poolName) throws IOException {
} }
public RemoteIterator<CachePoolEntry> listCachePools() throws IOException { public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
checkOpen(); return new CachePoolIterator(namenode);
try {
return namenode.listCachePools("");
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
} }
/** /**

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator;
/**
* CacheDirectiveIterator is a remote iterator that iterates cache directives.
* It supports retrying in case of namenode failover.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CacheDirectiveIterator
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
private final CacheDirectiveInfo filter;
private final ClientProtocol namenode;
public CacheDirectiveIterator(ClientProtocol namenode,
CacheDirectiveInfo filter) {
super(Long.valueOf(0));
this.namenode = namenode;
this.filter = filter;
}
@Override
public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
throws IOException {
return namenode.listCacheDirectives(prevKey, filter);
}
@Override
public Long elementToPrevKey(CacheDirectiveEntry entry) {
return entry.getInfo().getId();
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator;
/**
* CachePoolIterator is a remote iterator that iterates cache pools.
* It supports retrying in case of namenode failover.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CachePoolIterator
extends BatchedRemoteIterator<String, CachePoolEntry> {
private final ClientProtocol namenode;
public CachePoolIterator(ClientProtocol namenode) {
super("");
this.namenode = namenode;
}
@Override
public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throws IOException {
return namenode.listCachePools(prevKey);
}
@Override
public String elementToPrevKey(CachePoolEntry entry) {
return entry.getInfo().getPoolName();
}
}

View File

@ -28,9 +28,9 @@
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -1134,10 +1134,10 @@ public void modifyCacheDirective(
* listCacheDirectives. * listCacheDirectives.
* @param filter Parameters to use to filter the list results, * @param filter Parameters to use to filter the list results,
* or null to display all directives visible to us. * or null to display all directives visible to us.
* @return A RemoteIterator which returns CacheDirectiveInfo objects. * @return A batch of CacheDirectiveEntry objects.
*/ */
@Idempotent @Idempotent
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
long prevId, CacheDirectiveInfo filter) throws IOException; long prevId, CacheDirectiveInfo filter) throws IOException;
/** /**
@ -1175,9 +1175,9 @@ public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
* *
* @param prevPool name of the last pool listed, or the empty string if this is * @param prevPool name of the last pool listed, or the empty string if this is
* the first invocation of listCachePools * the first invocation of listCachePools
* @return A RemoteIterator which returns CachePool objects. * @return A batch of CachePoolEntry objects.
*/ */
@Idempotent @Idempotent
public RemoteIterator<CachePoolEntry> listCachePools(String prevPool) public BatchedEntries<CachePoolEntry> listCachePools(String prevPool)
throws IOException; throws IOException;
} }

View File

@ -24,12 +24,9 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -52,8 +49,6 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@ -109,7 +104,6 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
@ -176,9 +170,7 @@
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.commons.lang.StringUtils;
import com.google.common.primitives.Shorts;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -1079,21 +1071,13 @@ public ListCacheDirectivesResponseProto listCacheDirectives(
try { try {
CacheDirectiveInfo filter = CacheDirectiveInfo filter =
PBHelper.convert(request.getFilter()); PBHelper.convert(request.getFilter());
RemoteIterator<CacheDirectiveEntry> iter = BatchedEntries<CacheDirectiveEntry> entries =
server.listCacheDirectives(request.getPrevId(), filter); server.listCacheDirectives(request.getPrevId(), filter);
ListCacheDirectivesResponseProto.Builder builder = ListCacheDirectivesResponseProto.Builder builder =
ListCacheDirectivesResponseProto.newBuilder(); ListCacheDirectivesResponseProto.newBuilder();
long prevId = 0; builder.setHasMore(entries.hasMore());
while (iter.hasNext()) { for (int i=0, n=entries.size(); i<n; i++) {
CacheDirectiveEntry entry = iter.next(); builder.addElements(PBHelper.convert(entries.get(i)));
builder.addElements(PBHelper.convert(entry));
prevId = entry.getInfo().getId();
}
if (prevId == 0) {
builder.setHasMore(false);
} else {
iter = server.listCacheDirectives(prevId, filter);
builder.setHasMore(iter.hasNext());
} }
return builder.build(); return builder.build();
} catch (IOException e) { } catch (IOException e) {
@ -1138,22 +1122,13 @@ public RemoveCachePoolResponseProto removeCachePool(RpcController controller,
public ListCachePoolsResponseProto listCachePools(RpcController controller, public ListCachePoolsResponseProto listCachePools(RpcController controller,
ListCachePoolsRequestProto request) throws ServiceException { ListCachePoolsRequestProto request) throws ServiceException {
try { try {
RemoteIterator<CachePoolEntry> iter = BatchedEntries<CachePoolEntry> entries =
server.listCachePools(request.getPrevPoolName()); server.listCachePools(request.getPrevPoolName());
ListCachePoolsResponseProto.Builder responseBuilder = ListCachePoolsResponseProto.Builder responseBuilder =
ListCachePoolsResponseProto.newBuilder(); ListCachePoolsResponseProto.newBuilder();
String prevPoolName = null; responseBuilder.setHasMore(entries.hasMore());
while (iter.hasNext()) { for (int i=0, n=entries.size(); i<n; i++) {
CachePoolEntry entry = iter.next(); responseBuilder.addEntries(PBHelper.convert(entries.get(i)));
responseBuilder.addEntries(PBHelper.convert(entry));
prevPoolName = entry.getInfo().getPoolName();
}
// fill in hasNext
if (prevPoolName == null) {
responseBuilder.setHasMore(false);
} else {
iter = server.listCachePools(prevPoolName);
responseBuilder.setHasMore(iter.hasNext());
} }
return responseBuilder.build(); return responseBuilder.build();
} catch (IOException e) { } catch (IOException e) {

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -32,7 +31,6 @@
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@ -1062,46 +1060,23 @@ public boolean hasMore() {
} }
} }
private class CacheEntriesIterator
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
private final CacheDirectiveInfo filter;
public CacheEntriesIterator(long prevKey,
CacheDirectiveInfo filter) {
super(prevKey);
this.filter = filter;
}
@Override @Override
public BatchedEntries<CacheDirectiveEntry> makeRequest( public BatchedEntries<CacheDirectiveEntry>
Long nextKey) throws IOException {
ListCacheDirectivesResponseProto response;
try {
response = rpcProxy.listCacheDirectives(null,
ListCacheDirectivesRequestProto.newBuilder().
setPrevId(nextKey).
setFilter(PBHelper.convert(filter)).
build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return new BatchedCacheEntries(response);
}
@Override
public Long elementToPrevKey(CacheDirectiveEntry element) {
return element.getInfo().getId();
}
}
@Override
public RemoteIterator<CacheDirectiveEntry>
listCacheDirectives(long prevId, listCacheDirectives(long prevId,
CacheDirectiveInfo filter) throws IOException { CacheDirectiveInfo filter) throws IOException {
if (filter == null) { if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build(); filter = new CacheDirectiveInfo.Builder().build();
} }
return new CacheEntriesIterator(prevId, filter); try {
return new BatchedCacheEntries(
rpcProxy.listCacheDirectives(null,
ListCacheDirectivesRequestProto.newBuilder().
setPrevId(prevId).
setFilter(PBHelper.convert(filter)).
build()));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -1164,15 +1139,8 @@ public boolean hasMore() {
} }
} }
private class CachePoolIterator
extends BatchedRemoteIterator<String, CachePoolEntry> {
public CachePoolIterator(String prevKey) {
super(prevKey);
}
@Override @Override
public BatchedEntries<CachePoolEntry> makeRequest(String prevKey) public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
throws IOException { throws IOException {
try { try {
return new BatchedCachePoolEntries( return new BatchedCachePoolEntries(
@ -1183,16 +1151,4 @@ public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override
public String elementToPrevKey(CachePoolEntry entry) {
return entry.getInfo().getPoolName();
}
}
@Override
public RemoteIterator<CachePoolEntry> listCachePools(String prevKey)
throws IOException {
return new CachePoolIterator(prevKey);
}
} }

View File

@ -36,7 +36,6 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -46,8 +45,8 @@
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.HAServiceStatus;
@ -1251,36 +1250,13 @@ public void removeCacheDirective(long id) throws IOException {
namesystem.removeCacheDirective(id); namesystem.removeCacheDirective(id);
} }
private class ServerSideCacheEntriesIterator
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
private final CacheDirectiveInfo filter;
public ServerSideCacheEntriesIterator (Long firstKey,
CacheDirectiveInfo filter) {
super(firstKey);
this.filter = filter;
}
@Override @Override
public BatchedEntries<CacheDirectiveEntry> makeRequest( public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
Long nextKey) throws IOException {
return namesystem.listCacheDirectives(nextKey, filter);
}
@Override
public Long elementToPrevKey(CacheDirectiveEntry entry) {
return entry.getInfo().getId();
}
}
@Override
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(long prevId,
CacheDirectiveInfo filter) throws IOException { CacheDirectiveInfo filter) throws IOException {
if (filter == null) { if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build(); filter = new CacheDirectiveInfo.Builder().build();
} }
return new ServerSideCacheEntriesIterator(prevId, filter); return namesystem.listCacheDirectives(prevId, filter);
} }
@Override @Override
@ -1298,28 +1274,9 @@ public void removeCachePool(String cachePoolName) throws IOException {
namesystem.removeCachePool(cachePoolName); namesystem.removeCachePool(cachePoolName);
} }
private class ServerSideCachePoolIterator
extends BatchedRemoteIterator<String, CachePoolEntry> {
public ServerSideCachePoolIterator(String prevKey) {
super(prevKey);
}
@Override @Override
public BatchedEntries<CachePoolEntry> makeRequest(String prevKey) public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
throws IOException { throws IOException {
return namesystem.listCachePools(prevKey); return namesystem.listCachePools(prevKey != null ? prevKey : "");
}
@Override
public String elementToPrevKey(CachePoolEntry entry) {
return entry.getInfo().getPoolName();
}
}
@Override
public RemoteIterator<CachePoolEntry> listCachePools(String prevKey)
throws IOException {
return new ServerSideCachePoolIterator(prevKey);
} }
} }

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
@ -763,7 +764,7 @@ public Boolean get() {
} }
// Uncache and check each path in sequence // Uncache and check each path in sequence
RemoteIterator<CacheDirectiveEntry> entries = RemoteIterator<CacheDirectiveEntry> entries =
nnRpc.listCacheDirectives(0, null); new CacheDirectiveIterator(nnRpc, null);
for (int i=0; i<numFiles; i++) { for (int i=0; i<numFiles; i++) {
CacheDirectiveEntry entry = entries.next(); CacheDirectiveEntry entry = entries.next();
nnRpc.removeCacheDirective(entry.getInfo().getId()); nnRpc.removeCacheDirective(entry.getInfo().getId());

View File

@ -29,6 +29,7 @@
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -86,6 +87,7 @@ public class TestRetryCacheWithHA {
private static final int BlockSize = 1024; private static final int BlockSize = 1024;
private static final short DataNodes = 3; private static final short DataNodes = 3;
private static final int CHECKTIMES = 10; private static final int CHECKTIMES = 10;
private static final int ResponseSize = 3;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem dfs; private DistributedFileSystem dfs;
@ -120,6 +122,8 @@ protected Object invokeMethod(Method method, Object[] args)
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, ResponseSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, ResponseSize);
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()) .nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(DataNodes).build(); .numDataNodes(DataNodes).build();
@ -1175,4 +1179,92 @@ public void run() {
+ results.get(op.name)); + results.get(op.name));
} }
} }
/**
* Add a list of cache pools, list cache pools,
* switch active NN, and list cache pools again.
*/
@Test (timeout=60000)
public void testListCachePools() throws Exception {
final int poolCount = 7;
HashSet<String> poolNames = new HashSet<String>(poolCount);
for (int i=0; i<poolCount; i++) {
String poolName = "testListCachePools-" + i;
dfs.addCachePool(new CachePoolInfo(poolName));
poolNames.add(poolName);
}
listCachePools(poolNames, 0);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.waitActive(1);
listCachePools(poolNames, 1);
}
/**
* Add a list of cache directives, list cache directives,
* switch active NN, and list cache directives again.
*/
@Test (timeout=60000)
public void testListCacheDirectives() throws Exception {
final int poolCount = 7;
HashSet<String> poolNames = new HashSet<String>(poolCount);
Path path = new Path("/p");
for (int i=0; i<poolCount; i++) {
String poolName = "testListCacheDirectives-" + i;
CacheDirectiveInfo directiveInfo =
new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build();
dfs.addCachePool(new CachePoolInfo(poolName));
dfs.addCacheDirective(directiveInfo);
poolNames.add(poolName);
}
listCacheDirectives(poolNames, 0);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.waitActive(1);
listCacheDirectives(poolNames, 1);
}
@SuppressWarnings("unchecked")
private void listCachePools(
HashSet<String> poolNames, int active) throws Exception {
HashSet<String> tmpNames = (HashSet<String>)poolNames.clone();
RemoteIterator<CachePoolEntry> pools = dfs.listCachePools();
int poolCount = poolNames.size();
for (int i=0; i<poolCount; i++) {
CachePoolEntry pool = pools.next();
String pollName = pool.getInfo().getPoolName();
assertTrue("The pool name should be expected", tmpNames.remove(pollName));
if (i % 2 == 0) {
int standby = active;
active = (standby == 0) ? 1 : 0;
cluster.transitionToStandby(standby);
cluster.transitionToActive(active);
cluster.waitActive(active);
}
}
assertTrue("All pools must be found", tmpNames.isEmpty());
}
@SuppressWarnings("unchecked")
private void listCacheDirectives(
HashSet<String> poolNames, int active) throws Exception {
HashSet<String> tmpNames = (HashSet<String>)poolNames.clone();
RemoteIterator<CacheDirectiveEntry> directives = dfs.listCacheDirectives(null);
int poolCount = poolNames.size();
for (int i=0; i<poolCount; i++) {
CacheDirectiveEntry directive = directives.next();
String pollName = directive.getInfo().getPool();
assertTrue("The pool name should be expected", tmpNames.remove(pollName));
if (i % 2 == 0) {
int standby = active;
active = (standby == 0) ? 1 : 0;
cluster.transitionToStandby(standby);
cluster.transitionToActive(active);
cluster.waitActive(active);
}
}
assertTrue("All pools must be found", tmpNames.isEmpty());
}
} }