HDFS-3204. Minor modification to JournalProtocol.proto to make it generic. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1310134 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
80bbefafaf
commit
861c872541
@ -62,6 +62,9 @@ Trunk (unreleased changes)
|
||||
HDFS-3178. Add states and state handler for journal synchronization in
|
||||
JournalService. (szetszwo)
|
||||
|
||||
HDFS-3204. Minor modification to JournalProtocol.proto to make
|
||||
it generic. (suresh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
|
||||
|
@ -48,7 +48,7 @@ public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) {
|
||||
public JournalResponseProto journal(RpcController unused,
|
||||
JournalRequestProto req) throws ServiceException {
|
||||
try {
|
||||
impl.journal(PBHelper.convert(req.getRegistration()),
|
||||
impl.journal(PBHelper.convert(req.getJournalInfo()),
|
||||
req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
|
||||
.toByteArray());
|
||||
} catch (IOException e) {
|
||||
@ -62,7 +62,7 @@ public JournalResponseProto journal(RpcController unused,
|
||||
public StartLogSegmentResponseProto startLogSegment(RpcController controller,
|
||||
StartLogSegmentRequestProto req) throws ServiceException {
|
||||
try {
|
||||
impl.startLogSegment(PBHelper.convert(req.getRegistration()),
|
||||
impl.startLogSegment(PBHelper.convert(req.getJournalInfo()),
|
||||
req.getTxid());
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
|
@ -24,12 +24,10 @@
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||
@ -63,7 +61,7 @@ public void close() {
|
||||
public void journal(NamenodeRegistration reg, long firstTxnId,
|
||||
int numTxns, byte[] records) throws IOException {
|
||||
JournalRequestProto req = JournalRequestProto.newBuilder()
|
||||
.setRegistration(PBHelper.convert(reg))
|
||||
.setJournalInfo(PBHelper.convertToJournalInfo(reg))
|
||||
.setFirstTxnId(firstTxnId)
|
||||
.setNumTxns(numTxns)
|
||||
.setRecords(PBHelper.getByteString(records))
|
||||
@ -79,7 +77,7 @@ public void journal(NamenodeRegistration reg, long firstTxnId,
|
||||
public void startLogSegment(NamenodeRegistration registration, long txid)
|
||||
throws IOException {
|
||||
StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
|
||||
.setRegistration(PBHelper.convert(registration))
|
||||
.setJournalInfo(PBHelper.convertToJournalInfo(registration))
|
||||
.setTxid(txid)
|
||||
.build();
|
||||
try {
|
||||
|
@ -95,6 +95,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
@ -127,7 +128,6 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
@ -1346,4 +1346,26 @@ public static StorageReportProto convert(StorageReport r) {
|
||||
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
|
||||
.setStorageID(r.getStorageID()).build();
|
||||
}
|
||||
|
||||
public static NamenodeRegistration convert(JournalInfoProto info) {
|
||||
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
|
||||
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
|
||||
StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0);
|
||||
|
||||
// Note that the role is always {@link NamenodeRole#NAMENODE} as this
|
||||
// conversion happens for messages from Namenode to Journal receivers.
|
||||
// Addresses in the registration are unused.
|
||||
return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used for converting {@link JournalInfoProto} sent from Namenode
|
||||
* to Journal receivers to {@link NamenodeRegistration}.
|
||||
*/
|
||||
public static JournalInfoProto convertToJournalInfo(
|
||||
NamenodeRegistration reg) {
|
||||
return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID())
|
||||
.setLayoutVersion(reg.getLayoutVersion())
|
||||
.setNamespaceID(reg.getNamespaceID()).build();
|
||||
}
|
||||
}
|
||||
|
@ -331,8 +331,7 @@ private boolean tryConvergeJournalSpool() throws IOException {
|
||||
*/
|
||||
private synchronized void setState(BNState newState) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("State transition " + bnState + " -> " + newState,
|
||||
new Exception("trace"));
|
||||
LOG.debug("State transition " + bnState + " -> " + newState);
|
||||
}
|
||||
bnState = newState;
|
||||
}
|
||||
|
@ -27,17 +27,15 @@
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
|
||||
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
@ -225,8 +223,6 @@ public boolean setSafeMode(SafeModeAction action) throws IOException {
|
||||
|
||||
static class BackupNodeRpcServer extends NameNodeRpcServer implements
|
||||
JournalProtocol {
|
||||
private final String nnRpcAddress;
|
||||
|
||||
private BackupNodeRpcServer(Configuration conf, BackupNode nn)
|
||||
throws IOException {
|
||||
super(conf, nn);
|
||||
@ -236,8 +232,31 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn)
|
||||
.newReflectiveBlockingService(journalProtocolTranslator);
|
||||
DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service,
|
||||
this.clientRpcServer);
|
||||
nnRpcAddress = nn.nnRpcAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies a journal request
|
||||
* @param nodeReg node registration
|
||||
* @throws UnregisteredNodeException if the registration is invalid
|
||||
*/
|
||||
void verifyJournalRequest(NamenodeRegistration reg) throws IOException {
|
||||
verifyVersion(reg.getLayoutVersion());
|
||||
String errorMsg = null;
|
||||
int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
|
||||
if (reg.getNamespaceID() != expectedNamespaceID) {
|
||||
errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
|
||||
+ " actual " + reg.getNamespaceID();
|
||||
LOG.warn(errorMsg);
|
||||
throw new UnregisteredNodeException(reg);
|
||||
}
|
||||
if (!reg.getClusterID().equals(namesystem.getClusterId())) {
|
||||
errorMsg = "Invalid clusterId in journal request - expected "
|
||||
+ reg.getClusterID() + " actual " + namesystem.getClusterId();
|
||||
LOG.warn(errorMsg);
|
||||
throw new UnregisteredNodeException(reg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////
|
||||
// BackupNodeProtocol implementation for backup node.
|
||||
@ -246,8 +265,7 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn)
|
||||
public void startLogSegment(NamenodeRegistration registration, long txid)
|
||||
throws IOException {
|
||||
namesystem.checkOperation(OperationCategory.JOURNAL);
|
||||
verifyRequest(registration);
|
||||
|
||||
verifyJournalRequest(registration);
|
||||
getBNImage().namenodeStartedLogSegment(txid);
|
||||
}
|
||||
|
||||
@ -256,10 +274,7 @@ public void journal(NamenodeRegistration nnReg,
|
||||
long firstTxId, int numTxns,
|
||||
byte[] records) throws IOException {
|
||||
namesystem.checkOperation(OperationCategory.JOURNAL);
|
||||
verifyRequest(nnReg);
|
||||
if(!nnRpcAddress.equals(nnReg.getAddress()))
|
||||
throw new IOException("Journal request from unexpected name-node: "
|
||||
+ nnReg.getAddress() + " expecting " + nnRpcAddress);
|
||||
verifyJournalRequest(nnReg);
|
||||
getBNImage().journal(firstTxId, numTxns, records);
|
||||
}
|
||||
|
||||
|
@ -27,16 +27,25 @@ option java_generate_equals_and_hash = true;
|
||||
import "hdfs.proto";
|
||||
|
||||
/**
|
||||
* registration - the registration info of the active NameNode
|
||||
* firstTxnId - the first txid in the rolled edit log
|
||||
* Journal information used by the journal receiver to identify a journal.
|
||||
*/
|
||||
message JournalInfoProto {
|
||||
required string clusterID = 1; // ID of the cluster
|
||||
optional uint32 layoutVersion = 2; // Layout version
|
||||
optional uint32 namespaceID = 3; // Namespace ID
|
||||
}
|
||||
|
||||
/**
|
||||
* JournalInfo - the information about the journal
|
||||
* firstTxnId - the first txid in the journal records
|
||||
* numTxns - Number of transactions in editlog
|
||||
* records - bytes containing serialized journal records
|
||||
*/
|
||||
message JournalRequestProto {
|
||||
required NamenodeRegistrationProto registration = 1; // Registration info
|
||||
required uint64 firstTxnId = 2; // Transaction ID
|
||||
required uint32 numTxns = 3; // Transaction ID
|
||||
required bytes records = 4; // Journal record
|
||||
required JournalInfoProto journalInfo = 1;
|
||||
required uint64 firstTxnId = 2;
|
||||
required uint32 numTxns = 3;
|
||||
required bytes records = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -46,12 +55,12 @@ message JournalResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* registration - the registration info of the active NameNode
|
||||
* JournalInfo - the information about the journal
|
||||
* txid - first txid in the new log
|
||||
*/
|
||||
message StartLogSegmentRequestProto {
|
||||
required NamenodeRegistrationProto registration = 1; // Registration info
|
||||
required uint64 txid = 2; // Transaction ID
|
||||
required JournalInfoProto journalInfo = 1;
|
||||
required uint64 txid = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user