HDFS-2669 Enable protobuf rpc for ClientNamenodeProtocol
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1214128 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d9c9fb913e
commit
d8dfcdcbc2
@ -115,6 +115,8 @@ Trunk (unreleased changes)
|
|||||||
|
|
||||||
HDFS-2650. Replace @inheritDoc with @Override. (Hari Mankude via suresh).
|
HDFS-2650. Replace @inheritDoc with @Override. (Hari Mankude via suresh).
|
||||||
|
|
||||||
|
HDFS-2669 Enable protobuf rpc for ClientNamenodeProtocol
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HDFS-2477. Optimize computing the diff between a block report and the
|
HDFS-2477. Optimize computing the diff between a block report and the
|
||||||
namenode state. (Tomasz Nykiel via hairong)
|
namenode state. (Tomasz Nykiel via hairong)
|
||||||
|
@ -627,12 +627,12 @@ public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
|
|||||||
Configuration conf, UserGroupInformation ugi) throws IOException {
|
Configuration conf, UserGroupInformation ugi) throws IOException {
|
||||||
/**
|
/**
|
||||||
* Currently we have simply burnt-in support for a SINGLE
|
* Currently we have simply burnt-in support for a SINGLE
|
||||||
* protocol - protocolR23Compatible. This will be replaced
|
* protocol - protocolPB. This will be replaced
|
||||||
* by a way to pick the right protocol based on the
|
* by a way to pick the right protocol based on the
|
||||||
* version of the target server.
|
* version of the target server.
|
||||||
*/
|
*/
|
||||||
return new org.apache.hadoop.hdfs.protocolR23Compatible.
|
return new org.apache.hadoop.hdfs.protocolPB.
|
||||||
ClientNamenodeProtocolTranslatorR23(nameNodeAddr, conf, ugi);
|
ClientNamenodeProtocolTranslatorPB(nameNodeAddr, conf, ugi);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a {@link ClientDatanodeProtocol} proxy */
|
/** Create a {@link ClientDatanodeProtocol} proxy */
|
||||||
|
@ -19,11 +19,16 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
|
import org.apache.hadoop.fs.FsServerDefaults;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
@ -124,9 +129,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
@ -218,9 +221,7 @@ public GetBlockLocationsResponseProto getBlockLocations(
|
|||||||
Builder builder = GetBlockLocationsResponseProto
|
Builder builder = GetBlockLocationsResponseProto
|
||||||
.newBuilder();
|
.newBuilder();
|
||||||
if (b != null) {
|
if (b != null) {
|
||||||
builder.setLocations(
|
builder.setLocations(PBHelper.convert(b)).build();
|
||||||
PBHelper.convert(server.getBlockLocations(req.getSrc(),
|
|
||||||
req.getOffset(), req.getLength()))).build();
|
|
||||||
}
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -233,14 +234,19 @@ public GetServerDefaultsResponseProto getServerDefaults(
|
|||||||
RpcController controller, GetServerDefaultsRequestProto req)
|
RpcController controller, GetServerDefaultsRequestProto req)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try {
|
||||||
|
FsServerDefaults result = server.getServerDefaults();
|
||||||
return GetServerDefaultsResponseProto.newBuilder()
|
return GetServerDefaultsResponseProto.newBuilder()
|
||||||
.setServerDefaults(PBHelper.convert(server.getServerDefaults()))
|
.setServerDefaults(PBHelper.convert(result))
|
||||||
.build();
|
.build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static final CreateResponseProto VOID_CREATE_RESPONSE =
|
||||||
|
CreateResponseProto.newBuilder().build();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CreateResponseProto create(RpcController controller,
|
public CreateResponseProto create(RpcController controller,
|
||||||
CreateRequestProto req) throws ServiceException {
|
CreateRequestProto req) throws ServiceException {
|
||||||
@ -252,19 +258,22 @@ public CreateResponseProto create(RpcController controller,
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
return CreateResponseProto.newBuilder().build();
|
return VOID_CREATE_RESPONSE;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final AppendResponseProto NULL_APPEND_RESPONSE =
|
||||||
|
AppendResponseProto.newBuilder().build();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AppendResponseProto append(RpcController controller,
|
public AppendResponseProto append(RpcController controller,
|
||||||
AppendRequestProto req) throws ServiceException {
|
AppendRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
return AppendResponseProto
|
LocatedBlock result = server.append(req.getSrc(), req.getClientName());
|
||||||
.newBuilder()
|
if (result != null) {
|
||||||
.setBlock(
|
return AppendResponseProto.newBuilder()
|
||||||
PBHelper.convert(server.append(req.getSrc(), req.getClientName())))
|
.setBlock(PBHelper.convert(result)).build();
|
||||||
.build();
|
}
|
||||||
|
return NULL_APPEND_RESPONSE;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -274,18 +283,16 @@ public AppendResponseProto append(RpcController controller,
|
|||||||
public SetReplicationResponseProto setReplication(RpcController controller,
|
public SetReplicationResponseProto setReplication(RpcController controller,
|
||||||
SetReplicationRequestProto req) throws ServiceException {
|
SetReplicationRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
return SetReplicationResponseProto
|
boolean result =
|
||||||
.newBuilder()
|
server.setReplication(req.getSrc(), (short) req.getReplication());
|
||||||
.setResult(
|
return SetReplicationResponseProto.newBuilder().setResult(result).build();
|
||||||
server.setReplication(req.getSrc(), (short) req.getReplication()))
|
|
||||||
.build();
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static final SetPermissionResponseProto SET_PERM_RESPONSE =
|
static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE =
|
||||||
SetPermissionResponseProto.newBuilder().build();
|
SetPermissionResponseProto.newBuilder().build();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -296,24 +303,26 @@ public SetPermissionResponseProto setPermission(RpcController controller,
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
return SET_PERM_RESPONSE;
|
return VOID_SET_PERM_RESPONSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static final SetOwnerResponseProto SET_OWNER_RESPONSE =
|
static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE =
|
||||||
SetOwnerResponseProto.newBuilder().build();
|
SetOwnerResponseProto.newBuilder().build();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SetOwnerResponseProto setOwner(RpcController controller,
|
public SetOwnerResponseProto setOwner(RpcController controller,
|
||||||
SetOwnerRequestProto req) throws ServiceException {
|
SetOwnerRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
server.setOwner(req.getSrc(), req.getUsername(), req.getGroupname());
|
server.setOwner(req.getSrc(),
|
||||||
|
req.hasUsername() ? req.getUsername() : null,
|
||||||
|
req.hasGroupname() ? req.getGroupname() : null);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
return SET_OWNER_RESPONSE;
|
return VOID_SET_OWNER_RESPONSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static final AbandonBlockResponseProto ABD_BLOCK_RESPONSE =
|
static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE =
|
||||||
AbandonBlockResponseProto.newBuilder().build();
|
AbandonBlockResponseProto.newBuilder().build();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -325,20 +334,22 @@ public AbandonBlockResponseProto abandonBlock(RpcController controller,
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
return ABD_BLOCK_RESPONSE;
|
return VOID_ADD_BLOCK_RESPONSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddBlockResponseProto addBlock(RpcController controller,
|
public AddBlockResponseProto addBlock(RpcController controller,
|
||||||
AddBlockRequestProto req) throws ServiceException {
|
AddBlockRequestProto req) throws ServiceException {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return AddBlockResponseProto.newBuilder().setBlock(
|
List<DatanodeInfoProto> excl = req.getExcludeNodesList();
|
||||||
PBHelper.convert(
|
LocatedBlock result = server.addBlock(req.getSrc(), req.getClientName(),
|
||||||
server.addBlock(req.getSrc(), req.getClientName(),
|
|
||||||
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
|
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
|
||||||
PBHelper.convert(
|
(excl == null ||
|
||||||
(DatanodeInfoProto[]) req.getExcludeNodesList().toArray()))))
|
excl.size() == 0) ? null :
|
||||||
.build();
|
PBHelper.convert(excl.toArray(new DatanodeInfoProto[excl.size()])));
|
||||||
|
return AddBlockResponseProto.newBuilder().setBlock(
|
||||||
|
PBHelper.convert(result)).build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -349,15 +360,17 @@ public GetAdditionalDatanodeResponseProto getAdditionalDatanode(
|
|||||||
RpcController controller, GetAdditionalDatanodeRequestProto req)
|
RpcController controller, GetAdditionalDatanodeRequestProto req)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try {
|
||||||
|
List<DatanodeInfoProto> existingList = req.getExistingsList();
|
||||||
|
List<DatanodeInfoProto> excludesList = req.getExcludesList();
|
||||||
|
LocatedBlock result = server.getAdditionalDatanode(
|
||||||
|
req.getSrc(), PBHelper.convert(req.getBlk()),
|
||||||
|
PBHelper.convert(existingList.toArray(
|
||||||
|
new DatanodeInfoProto[existingList.size()])),
|
||||||
|
PBHelper.convert(excludesList.toArray(
|
||||||
|
new DatanodeInfoProto[excludesList.size()])),
|
||||||
|
req.getNumAdditionalNodes(), req.getClientName());
|
||||||
return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
|
return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
|
||||||
PBHelper.convert(
|
PBHelper.convert(result))
|
||||||
server.getAdditionalDatanode(req.getSrc(),
|
|
||||||
PBHelper.convert(req.getBlk()),
|
|
||||||
PBHelper.convert((DatanodeInfoProto[]) req.getExistingsList()
|
|
||||||
.toArray()), PBHelper
|
|
||||||
.convert((DatanodeInfoProto[]) req.getExcludesList()
|
|
||||||
.toArray()), req.getNumAdditionalNodes(), req
|
|
||||||
.getClientName())))
|
|
||||||
.build();
|
.build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
@ -368,10 +381,10 @@ public GetAdditionalDatanodeResponseProto getAdditionalDatanode(
|
|||||||
public CompleteResponseProto complete(RpcController controller,
|
public CompleteResponseProto complete(RpcController controller,
|
||||||
CompleteRequestProto req) throws ServiceException {
|
CompleteRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
return CompleteResponseProto.newBuilder().setResult(
|
boolean result =
|
||||||
server.complete(req.getSrc(), req.getClientName(),
|
server.complete(req.getSrc(), req.getClientName(),
|
||||||
PBHelper.convert(req.getLast())))
|
req.hasLast() ? PBHelper.convert(req.getLast()) : null);
|
||||||
.build();
|
return CompleteResponseProto.newBuilder().setResult(result).build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -384,8 +397,9 @@ public CompleteResponseProto complete(RpcController controller,
|
|||||||
public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
|
public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
|
||||||
ReportBadBlocksRequestProto req) throws ServiceException {
|
ReportBadBlocksRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
|
List<LocatedBlockProto> bl = req.getBlocksList();
|
||||||
server.reportBadBlocks(PBHelper.convertLocatedBlock(
|
server.reportBadBlocks(PBHelper.convertLocatedBlock(
|
||||||
(LocatedBlockProto[]) req.getBlocksList().toArray()));
|
bl.toArray(new LocatedBlockProto[bl.size()])));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -399,7 +413,8 @@ public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
|
|||||||
public ConcatResponseProto concat(RpcController controller,
|
public ConcatResponseProto concat(RpcController controller,
|
||||||
ConcatRequestProto req) throws ServiceException {
|
ConcatRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
server.concat(req.getTrg(), (String[])req.getSrcsList().toArray());
|
List<String> srcs = req.getSrcsList();
|
||||||
|
server.concat(req.getTrg(), srcs.toArray(new String[srcs.size()]));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -456,14 +471,21 @@ public MkdirsResponseProto mkdirs(RpcController controller,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final GetListingResponseProto NULL_GETLISTING_RESPONSE =
|
||||||
|
GetListingResponseProto.newBuilder().build();
|
||||||
@Override
|
@Override
|
||||||
public GetListingResponseProto getListing(RpcController controller,
|
public GetListingResponseProto getListing(RpcController controller,
|
||||||
GetListingRequestProto req) throws ServiceException {
|
GetListingRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
DirectoryListingProto result = PBHelper.convert(server.getListing(
|
DirectoryListing result = server.getListing(
|
||||||
req.getSrc(), req.getStartAfter().toByteArray(),
|
req.getSrc(), req.getStartAfter().toByteArray(),
|
||||||
req.getNeedLocation()));
|
req.getNeedLocation());
|
||||||
return GetListingResponseProto.newBuilder().setDirList(result).build();
|
if (result !=null) {
|
||||||
|
return GetListingResponseProto.newBuilder().setDirList(
|
||||||
|
PBHelper.convert(result)).build();
|
||||||
|
} else {
|
||||||
|
return NULL_GETLISTING_RESPONSE;
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -494,6 +516,19 @@ public RecoverLeaseResponseProto recoverLease(RpcController controller,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RestoreFailedStorageResponseProto restoreFailedStorage(
|
||||||
|
RpcController controller, RestoreFailedStorageRequestProto req)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
boolean result = server.restoreFailedStorage(req.getArg());
|
||||||
|
return RestoreFailedStorageResponseProto.newBuilder().setResult(result)
|
||||||
|
.build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetFsStatsResponseProto getFsStats(RpcController controller,
|
public GetFsStatsResponseProto getFsStats(RpcController controller,
|
||||||
GetFsStatusRequestProto req) throws ServiceException {
|
GetFsStatusRequestProto req) throws ServiceException {
|
||||||
@ -557,19 +592,6 @@ public SaveNamespaceResponseProto saveNamespace(RpcController controller,
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public RestoreFailedStorageResponseProto restoreFailedStorage(
|
|
||||||
RpcController controller, RestoreFailedStorageRequestProto req)
|
|
||||||
throws ServiceException {
|
|
||||||
try {
|
|
||||||
boolean result = server.restoreFailedStorage(req.getArg());
|
|
||||||
return RestoreFailedStorageResponseProto.newBuilder().setResult(result)
|
|
||||||
.build();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ServiceException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE =
|
static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE =
|
||||||
RefreshNodesResponseProto.newBuilder().build();
|
RefreshNodesResponseProto.newBuilder().build();
|
||||||
|
|
||||||
@ -622,9 +644,10 @@ public ListCorruptFileBlocksResponseProto listCorruptFileBlocks(
|
|||||||
RpcController controller, ListCorruptFileBlocksRequestProto req)
|
RpcController controller, ListCorruptFileBlocksRequestProto req)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try {
|
||||||
CorruptFileBlocksProto result = PBHelper.convert(server
|
CorruptFileBlocks result = server.listCorruptFileBlocks(
|
||||||
.listCorruptFileBlocks(req.getPath(), req.getCookie()));
|
req.getPath(), req.hasCookie() ? req.getCookie(): null);
|
||||||
return ListCorruptFileBlocksResponseProto.newBuilder().setCorrupt(result)
|
return ListCorruptFileBlocksResponseProto.newBuilder()
|
||||||
|
.setCorrupt(PBHelper.convert(result))
|
||||||
.build();
|
.build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
@ -646,29 +669,40 @@ public MetaSaveResponseProto metaSave(RpcController controller,
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final GetFileInfoResponseProto NULL_GETFILEINFO_RESPONSE =
|
||||||
|
GetFileInfoResponseProto.newBuilder().build();
|
||||||
@Override
|
@Override
|
||||||
public GetFileInfoResponseProto getFileInfo(RpcController controller,
|
public GetFileInfoResponseProto getFileInfo(RpcController controller,
|
||||||
GetFileInfoRequestProto req) throws ServiceException {
|
GetFileInfoRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
HdfsFileStatus res = server.getFileInfo(req.getSrc());
|
HdfsFileStatus result = server.getFileInfo(req.getSrc());
|
||||||
GetFileInfoResponseProto.Builder builder =
|
|
||||||
GetFileInfoResponseProto.newBuilder();
|
if (result != null) {
|
||||||
if (res != null) {
|
return GetFileInfoResponseProto.newBuilder().setFs(
|
||||||
builder.setFs(PBHelper.convert(res));
|
PBHelper.convert(result)).build();
|
||||||
}
|
}
|
||||||
return builder.build();
|
return NULL_GETFILEINFO_RESPONSE;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final GetFileLinkInfoResponseProto NULL_GETFILELINKINFO_RESPONSE =
|
||||||
|
GetFileLinkInfoResponseProto.newBuilder().build();
|
||||||
@Override
|
@Override
|
||||||
public GetFileLinkInfoResponseProto getFileLinkInfo(RpcController controller,
|
public GetFileLinkInfoResponseProto getFileLinkInfo(RpcController controller,
|
||||||
GetFileLinkInfoRequestProto req) throws ServiceException {
|
GetFileLinkInfoRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
HdfsFileStatusProto result =
|
HdfsFileStatus result = server.getFileLinkInfo(req.getSrc());
|
||||||
PBHelper.convert(server.getFileLinkInfo(req.getSrc()));
|
if (result != null) {
|
||||||
return GetFileLinkInfoResponseProto.newBuilder().setFs(result).build();
|
System.out.println("got non null result for getFileLinkInfo for " + req.getSrc());
|
||||||
|
return GetFileLinkInfoResponseProto.newBuilder().setFs(
|
||||||
|
PBHelper.convert(result)).build();
|
||||||
|
} else {
|
||||||
|
System.out.println("got null result for getFileLinkInfo for " + req.getSrc());
|
||||||
|
return NULL_GETFILELINKINFO_RESPONSE;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -679,10 +713,9 @@ public GetContentSummaryResponseProto getContentSummary(
|
|||||||
RpcController controller, GetContentSummaryRequestProto req)
|
RpcController controller, GetContentSummaryRequestProto req)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try {
|
||||||
ContentSummaryProto result =
|
ContentSummary result = server.getContentSummary(req.getPath());
|
||||||
PBHelper.convert(server.getContentSummary(req.getPath()));
|
return GetContentSummaryResponseProto.newBuilder()
|
||||||
return
|
.setSummary(PBHelper.convert(result)).build();
|
||||||
GetContentSummaryResponseProto.newBuilder().setSummary(result).build();
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -780,10 +813,11 @@ public UpdateBlockForPipelineResponseProto updateBlockForPipeline(
|
|||||||
public UpdatePipelineResponseProto updatePipeline(RpcController controller,
|
public UpdatePipelineResponseProto updatePipeline(RpcController controller,
|
||||||
UpdatePipelineRequestProto req) throws ServiceException {
|
UpdatePipelineRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
|
List<DatanodeIDProto> newNodes = req.getNewNodesList();
|
||||||
server
|
server
|
||||||
.updatePipeline(req.getClientName(), PBHelper.convert(req
|
.updatePipeline(req.getClientName(), PBHelper.convert(req
|
||||||
.getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper
|
.getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper
|
||||||
.convert((DatanodeIDProto[]) req.getNewNodesList().toArray()));
|
.convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])));
|
||||||
return VOID_UPDATEPIPELINE_RESPONSE;
|
return VOID_UPDATEPIPELINE_RESPONSE;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
|
@ -76,6 +76,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
|
||||||
@ -95,9 +96,11 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
||||||
@ -121,6 +124,8 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
@ -263,7 +268,8 @@ public LocatedBlock append(String src, String clientName)
|
|||||||
.setClientName(clientName)
|
.setClientName(clientName)
|
||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
return PBHelper.convert(rpcProxy.append(null, req).getBlock());
|
AppendResponseProto res = rpcProxy.append(null, req);
|
||||||
|
return res.hasBlock() ? PBHelper.convert(res.getBlock()) : null;
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
@ -304,13 +310,14 @@ public void setPermission(String src, FsPermission permission)
|
|||||||
public void setOwner(String src, String username, String groupname)
|
public void setOwner(String src, String username, String groupname)
|
||||||
throws AccessControlException, FileNotFoundException, SafeModeException,
|
throws AccessControlException, FileNotFoundException, SafeModeException,
|
||||||
UnresolvedLinkException, IOException {
|
UnresolvedLinkException, IOException {
|
||||||
SetOwnerRequestProto req = SetOwnerRequestProto.newBuilder()
|
SetOwnerRequestProto.Builder req = SetOwnerRequestProto.newBuilder()
|
||||||
.setSrc(src)
|
.setSrc(src);
|
||||||
.setUsername(username)
|
if (username != null)
|
||||||
.setGroupname(groupname)
|
req.setUsername(username);
|
||||||
.build();
|
if (groupname != null)
|
||||||
|
req.setGroupname(groupname);
|
||||||
try {
|
try {
|
||||||
rpcProxy.setOwner(null, req);
|
rpcProxy.setOwner(null, req.build());
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
@ -335,15 +342,14 @@ public LocatedBlock addBlock(String src, String clientName,
|
|||||||
throws AccessControlException, FileNotFoundException,
|
throws AccessControlException, FileNotFoundException,
|
||||||
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
|
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
|
||||||
IOException {
|
IOException {
|
||||||
AddBlockRequestProto.Builder builder = AddBlockRequestProto.newBuilder();
|
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder().setSrc(src)
|
||||||
builder.setSrc(src)
|
.setClientName(clientName);
|
||||||
.setClientName(clientName)
|
if (previous != null)
|
||||||
.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
|
req.setPrevious(PBHelper.convert(previous));
|
||||||
if (previous != null) {
|
if (excludeNodes != null)
|
||||||
builder.setPrevious(PBHelper.convert(previous));
|
req.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
return PBHelper.convert(rpcProxy.addBlock(null, builder.build()).getBlock());
|
return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
@ -376,13 +382,13 @@ public LocatedBlock getAdditionalDatanode(String src, ExtendedBlock blk,
|
|||||||
public boolean complete(String src, String clientName, ExtendedBlock last)
|
public boolean complete(String src, String clientName, ExtendedBlock last)
|
||||||
throws AccessControlException, FileNotFoundException, SafeModeException,
|
throws AccessControlException, FileNotFoundException, SafeModeException,
|
||||||
UnresolvedLinkException, IOException {
|
UnresolvedLinkException, IOException {
|
||||||
CompleteRequestProto req = CompleteRequestProto.newBuilder()
|
CompleteRequestProto.Builder req = CompleteRequestProto.newBuilder()
|
||||||
.setSrc(src)
|
.setSrc(src)
|
||||||
.setClientName(clientName)
|
.setClientName(clientName);
|
||||||
.setLast(PBHelper.convert(last))
|
if (last != null)
|
||||||
.build();
|
req.setLast(PBHelper.convert(last));
|
||||||
try {
|
try {
|
||||||
return rpcProxy.complete(null, req).getResult();
|
return rpcProxy.complete(null, req.build()).getResult();
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
@ -493,7 +499,12 @@ public DirectoryListing getListing(String src, byte[] startAfter,
|
|||||||
.setStartAfter(ByteString.copyFrom(startAfter))
|
.setStartAfter(ByteString.copyFrom(startAfter))
|
||||||
.setNeedLocation(needLocation).build();
|
.setNeedLocation(needLocation).build();
|
||||||
try {
|
try {
|
||||||
return PBHelper.convert(rpcProxy.getListing(null, req).getDirList());
|
GetListingResponseProto result = rpcProxy.getListing(null, req);
|
||||||
|
|
||||||
|
if (result.hasDirList()) {
|
||||||
|
return PBHelper.convert(result.getDirList());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
@ -635,11 +646,13 @@ public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
|
|||||||
@Override
|
@Override
|
||||||
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
|
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ListCorruptFileBlocksRequestProto req = ListCorruptFileBlocksRequestProto
|
ListCorruptFileBlocksRequestProto.Builder req =
|
||||||
.newBuilder().setPath(path).setCookie(cookie).build();
|
ListCorruptFileBlocksRequestProto.newBuilder().setPath(path);
|
||||||
|
if (cookie != null)
|
||||||
|
req.setCookie(cookie);
|
||||||
try {
|
try {
|
||||||
return PBHelper.convert(
|
return PBHelper.convert(
|
||||||
rpcProxy.listCorruptFileBlocks(null, req).getCorrupt());
|
rpcProxy.listCorruptFileBlocks(null, req.build()).getCorrupt());
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
@ -676,7 +689,9 @@ public HdfsFileStatus getFileLinkInfo(String src)
|
|||||||
GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder()
|
GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder()
|
||||||
.setSrc(src).build();
|
.setSrc(src).build();
|
||||||
try {
|
try {
|
||||||
return PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs());
|
GetFileLinkInfoResponseProto result = rpcProxy.getFileLinkInfo(null, req);
|
||||||
|
return result.hasFs() ?
|
||||||
|
PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()) : null;
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
|
@ -974,6 +974,13 @@ public static EnumSetWritable<CreateFlag> convert(int flag) {
|
|||||||
if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
|
if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
|
||||||
result.add(CreateFlag.APPEND);
|
result.add(CreateFlag.APPEND);
|
||||||
}
|
}
|
||||||
|
if ((flag & CreateFlagProto.CREATE_VALUE) == CreateFlagProto.CREATE_VALUE) {
|
||||||
|
result.add(CreateFlag.CREATE);
|
||||||
|
}
|
||||||
|
if ((flag & CreateFlagProto.OVERWRITE_VALUE)
|
||||||
|
== CreateFlagProto.OVERWRITE_VALUE) {
|
||||||
|
result.add(CreateFlag.OVERWRITE);
|
||||||
|
}
|
||||||
return new EnumSetWritable<CreateFlag>(result);
|
return new EnumSetWritable<CreateFlag>(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1005,7 +1012,7 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
|
|||||||
public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
|
public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
|
||||||
if (fs == null)
|
if (fs == null)
|
||||||
return null;
|
return null;
|
||||||
FileType fType = FileType.IS_DIR;;
|
FileType fType = FileType.IS_FILE;
|
||||||
if (fs.isDir()) {
|
if (fs.isDir()) {
|
||||||
fType = FileType.IS_DIR;
|
fType = FileType.IS_DIR;
|
||||||
} else if (fs.isSymlink()) {
|
} else if (fs.isSymlink()) {
|
||||||
@ -1024,8 +1031,7 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
|
|||||||
setOwner(fs.getOwner()).
|
setOwner(fs.getOwner()).
|
||||||
setGroup(fs.getGroup()).
|
setGroup(fs.getGroup()).
|
||||||
setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
|
setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
|
||||||
|
if (fs.isSymlink()) {
|
||||||
if (fs.getSymlink() != null) {
|
|
||||||
builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
|
builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
|
||||||
}
|
}
|
||||||
if (fs instanceof HdfsLocatedFileStatus) {
|
if (fs instanceof HdfsLocatedFileStatus) {
|
||||||
@ -1052,7 +1058,7 @@ public static HdfsFileStatus[] convert(HdfsFileStatusProto[] fs) {
|
|||||||
final int len = fs.length;
|
final int len = fs.length;
|
||||||
HdfsFileStatus[] result = new HdfsFileStatus[len];
|
HdfsFileStatus[] result = new HdfsFileStatus[len];
|
||||||
for (int i = 0; i < len; ++i) {
|
for (int i = 0; i < len; ++i) {
|
||||||
PBHelper.convert(fs[i]);
|
result[i] = PBHelper.convert(fs[i]);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@ -1060,9 +1066,11 @@ public static HdfsFileStatus[] convert(HdfsFileStatusProto[] fs) {
|
|||||||
public static DirectoryListing convert(DirectoryListingProto dl) {
|
public static DirectoryListing convert(DirectoryListingProto dl) {
|
||||||
if (dl == null)
|
if (dl == null)
|
||||||
return null;
|
return null;
|
||||||
|
List<HdfsFileStatusProto> partList = dl.getPartialListingList();
|
||||||
return new DirectoryListing(
|
return new DirectoryListing(
|
||||||
PBHelper.convert((HdfsFileStatusProto[])
|
partList.isEmpty() ? new HdfsFileStatus[0]
|
||||||
dl.getPartialListingList().toArray()),
|
: PBHelper.convert(
|
||||||
|
partList.toArray(new HdfsFileStatusProto[partList.size()])),
|
||||||
dl.getRemainingEntries());
|
dl.getRemainingEntries());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,14 +58,15 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23;
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
||||||
@ -89,6 +90,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
@ -141,9 +143,13 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
|
|||||||
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
|
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
|
||||||
DFS_DATANODE_HANDLER_COUNT_DEFAULT);
|
DFS_DATANODE_HANDLER_COUNT_DEFAULT);
|
||||||
InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
|
InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
|
||||||
ClientNamenodeProtocolServerSideTranslatorR23
|
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
|
||||||
|
ProtobufRpcEngine.class);
|
||||||
|
ClientNamenodeProtocolServerSideTranslatorPB
|
||||||
clientProtocolServerTranslator =
|
clientProtocolServerTranslator =
|
||||||
new ClientNamenodeProtocolServerSideTranslatorR23(this);
|
new ClientNamenodeProtocolServerSideTranslatorPB(this);
|
||||||
|
BlockingService clientNNPbService = ClientNamenodeProtocol.
|
||||||
|
newReflectiveBlockingService(clientProtocolServerTranslator);
|
||||||
|
|
||||||
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
|
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
|
||||||
new DatanodeProtocolServerSideTranslatorPB(this);
|
new DatanodeProtocolServerSideTranslatorPB(this);
|
||||||
@ -152,7 +158,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
|
|||||||
|
|
||||||
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
|
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
|
||||||
new NamenodeProtocolServerSideTranslatorPB(this);
|
new NamenodeProtocolServerSideTranslatorPB(this);
|
||||||
BlockingService service = NamenodeProtocolService
|
BlockingService NNPbService = NamenodeProtocolService
|
||||||
.newReflectiveBlockingService(namenodeProtocolXlator);
|
.newReflectiveBlockingService(namenodeProtocolXlator);
|
||||||
|
|
||||||
InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
|
InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
|
||||||
@ -162,8 +168,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
|
|||||||
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
||||||
// Add all the RPC protocols that the namenode implements
|
// Add all the RPC protocols that the namenode implements
|
||||||
this.serviceRpcServer =
|
this.serviceRpcServer =
|
||||||
RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.
|
RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
|
||||||
ClientNamenodeWireProtocol.class, clientProtocolServerTranslator,
|
ClientNamenodeProtocolPB.class, clientNNPbService,
|
||||||
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
|
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
|
||||||
serviceHandlerCount,
|
serviceHandlerCount,
|
||||||
false, conf, namesystem.getDelegationTokenSecretManager());
|
false, conf, namesystem.getDelegationTokenSecretManager());
|
||||||
@ -173,7 +179,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
|
|||||||
RefreshUserMappingsProtocol.class, this);
|
RefreshUserMappingsProtocol.class, this);
|
||||||
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
||||||
GetUserMappingsProtocol.class, this);
|
GetUserMappingsProtocol.class, this);
|
||||||
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
|
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
|
||||||
serviceRpcServer);
|
serviceRpcServer);
|
||||||
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
|
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
|
||||||
serviceRpcServer);
|
serviceRpcServer);
|
||||||
@ -186,9 +192,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
|
|||||||
}
|
}
|
||||||
// Add all the RPC protocols that the namenode implements
|
// Add all the RPC protocols that the namenode implements
|
||||||
this.clientRpcServer = RPC.getServer(
|
this.clientRpcServer = RPC.getServer(
|
||||||
org.apache.hadoop.hdfs.protocolR23Compatible.
|
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
|
||||||
ClientNamenodeWireProtocol.class,
|
clientNNPbService, socAddr.getHostName(),
|
||||||
clientProtocolServerTranslator, socAddr.getHostName(),
|
|
||||||
socAddr.getPort(), handlerCount, false, conf,
|
socAddr.getPort(), handlerCount, false, conf,
|
||||||
namesystem.getDelegationTokenSecretManager());
|
namesystem.getDelegationTokenSecretManager());
|
||||||
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
||||||
@ -197,7 +202,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
|
|||||||
RefreshUserMappingsProtocol.class, this);
|
RefreshUserMappingsProtocol.class, this);
|
||||||
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
||||||
GetUserMappingsProtocol.class, this);
|
GetUserMappingsProtocol.class, this);
|
||||||
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
|
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
|
||||||
clientRpcServer);
|
clientRpcServer);
|
||||||
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
|
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
|
||||||
clientRpcServer);
|
clientRpcServer);
|
||||||
@ -259,7 +264,7 @@ public long getProtocolVersion(String protocol,
|
|||||||
long clientVersion) throws IOException {
|
long clientVersion) throws IOException {
|
||||||
if (protocol.equals(ClientProtocol.class.getName())) {
|
if (protocol.equals(ClientProtocol.class.getName())) {
|
||||||
throw new IOException("Old Namenode Client protocol is not supported:" +
|
throw new IOException("Old Namenode Client protocol is not supported:" +
|
||||||
protocol + "Switch your clientside to " + ClientNamenodeWireProtocol.class);
|
protocol + "Switch your clientside to " + ClientNamenodeProtocol.class);
|
||||||
} else if (protocol.equals(DatanodeProtocol.class.getName())){
|
} else if (protocol.equals(DatanodeProtocol.class.getName())){
|
||||||
return DatanodeProtocol.versionID;
|
return DatanodeProtocol.versionID;
|
||||||
} else if (protocol.equals(NamenodeProtocol.class.getName())){
|
} else if (protocol.equals(NamenodeProtocol.class.getName())){
|
||||||
|
@ -74,7 +74,7 @@ message AppendRequestProto {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message AppendResponseProto {
|
message AppendResponseProto {
|
||||||
required LocatedBlockProto block = 1;
|
optional LocatedBlockProto block = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SetReplicationRequestProto {
|
message SetReplicationRequestProto {
|
||||||
@ -96,8 +96,8 @@ message SetPermissionResponseProto { // void response
|
|||||||
|
|
||||||
message SetOwnerRequestProto {
|
message SetOwnerRequestProto {
|
||||||
required string src = 1;
|
required string src = 1;
|
||||||
required string username = 2;
|
optional string username = 2;
|
||||||
required string groupname = 3;
|
optional string groupname = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SetOwnerResponseProto { // void response
|
message SetOwnerResponseProto { // void response
|
||||||
@ -139,7 +139,7 @@ message GetAdditionalDatanodeResponseProto {
|
|||||||
message CompleteRequestProto {
|
message CompleteRequestProto {
|
||||||
required string src = 1;
|
required string src = 1;
|
||||||
required string clientName = 2;
|
required string clientName = 2;
|
||||||
required ExtendedBlockProto last = 3;
|
optional ExtendedBlockProto last = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message CompleteResponseProto {
|
message CompleteResponseProto {
|
||||||
@ -204,7 +204,7 @@ message GetListingRequestProto {
|
|||||||
required bool needLocation = 3;
|
required bool needLocation = 3;
|
||||||
}
|
}
|
||||||
message GetListingResponseProto {
|
message GetListingResponseProto {
|
||||||
required DirectoryListingProto dirList = 1;
|
optional DirectoryListingProto dirList = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RenewLeaseRequestProto {
|
message RenewLeaseRequestProto {
|
||||||
@ -311,7 +311,7 @@ message DistributedUpgradeProgressResponseProto {
|
|||||||
|
|
||||||
message ListCorruptFileBlocksRequestProto {
|
message ListCorruptFileBlocksRequestProto {
|
||||||
required string path = 1;
|
required string path = 1;
|
||||||
required string cookie = 2;
|
optional string cookie = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ListCorruptFileBlocksResponseProto {
|
message ListCorruptFileBlocksResponseProto {
|
||||||
@ -338,7 +338,7 @@ message GetFileLinkInfoRequestProto {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message GetFileLinkInfoResponseProto {
|
message GetFileLinkInfoResponseProto {
|
||||||
required HdfsFileStatusProto fs = 1;
|
optional HdfsFileStatusProto fs = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message GetContentSummaryRequestProto {
|
message GetContentSummaryRequestProto {
|
||||||
|
Loading…
Reference in New Issue
Block a user