HDFS-2581. Implement protobuf service for JournalProtocol. Contributed Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1210657 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-12-05 21:52:23 +00:00
parent d18e5b3844
commit c256f8266d
8 changed files with 516 additions and 30 deletions

View File

@ -13,6 +13,8 @@ Trunk (unreleased changes)
HDFS-2519. Add protobuf service for DatanodeProtocol. (suresh) HDFS-2519. Add protobuf service for DatanodeProtocol. (suresh)
HDFS-2581. Implement protobuf service for JournalProtocol. (suresh)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple HADOOP-7524 Change RPC to allow multiple protocols including multuple

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
* Protocol used to journal edits to a remote node. Currently,
* this is used to publish edits from the NameNode to a BackupNode.
*
* Note: This extends the protocolbuffer service based interface to
* add annotations required for security.
*/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
@ProtocolInfo(protocolName =
"org.apache.hadoop.hdfs.server.protocol.JournalProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface JournalProtocolPB extends
JournalProtocolService.BlockingInterface, VersionedProtocol {
/**
* This method is defined to get the protocol signature using
* the R23 protocol - hence we have added the suffix of 2 the method name
* to avoid conflict.
*/
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException;
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.VersionedProtocol;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Implementation for protobuf service that forwards requests
* received on {@link JournalProtocolPB} to the
* {@link JournalProtocol} server implementation.
*/
public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB {
/** Server side implementation to delegate the requests to */
private final JournalProtocol impl;
public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) {
this.impl = impl;
}
/** @see JournalProtocol#journal */
@Override
public JournalResponseProto journal(RpcController unused,
JournalRequestProto req) throws ServiceException {
try {
impl.journal(PBHelper.convert(req.getRegistration()),
req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
.toByteArray());
} catch (IOException e) {
throw new ServiceException(e);
}
return JournalResponseProto.newBuilder().build();
}
/** @see JournalProtocol#startLogSegment */
@Override
public StartLogSegmentResponseProto startLogSegment(RpcController controller,
StartLogSegmentRequestProto req) throws ServiceException {
try {
impl.startLogSegment(PBHelper.convert(req.getRegistration()),
req.getTxid());
} catch (IOException e) {
throw new ServiceException(e);
}
return StartLogSegmentResponseProto.newBuilder().build();
}
/** @see VersionedProtocol#getProtocolVersion */
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return RPC.getProtocolVersion(JournalProtocolPB.class);
}
/**
* The client side will redirect getProtocolSignature to
* getProtocolSignature2.
*
* However the RPC layer below on the Server side will call getProtocolVersion
* and possibly in the future getProtocolSignature. Hence we still implement
* it even though the end client will never call this method.
*
* @see VersionedProtocol#getProtocolSignature(String, long, int)
*/
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link JournalProtocol}
*/
if (!protocol.equals(RPC.getProtocolName(JournalProtocolPB.class))) {
throw new IOException("Namenode Serverside implements " +
RPC.getProtocolName(JournalProtocolPB.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(JournalProtocolPB.class),
JournalProtocolPB.class);
}
@Override
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link JournalPBProtocol}
*/
return ProtocolSignatureWritable.convert(
this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client side translator to translate the requests made on
* {@link JournalProtocol} interfaces to the RPC server implementing
* {@link JournalProtocolPB}.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class JournalProtocolTranslatorPB implements JournalProtocol, Closeable {
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
private final JournalProtocolPB rpcProxy;
public JournalProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(JournalProtocolPB.class,
JournalProtocol.versionID, nameNodeAddr, conf);
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
@Override
public long getProtocolVersion(String protocolName, long clientVersion)
throws IOException {
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
protocol, clientVersion, clientMethodsHash));
}
@Override
public void journal(NamenodeRegistration reg, long firstTxnId,
int numTxns, byte[] records) throws IOException {
JournalRequestProto req = JournalRequestProto.newBuilder()
.setRegistration(PBHelper.convert(reg))
.setFirstTxnId(firstTxnId)
.setNumTxns(numTxns)
.setRecords(PBHelper.getByteString(records))
.build();
try {
rpcProxy.journal(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void startLogSegment(NamenodeRegistration registration, long txid)
throws IOException {
StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setTxid(txid)
.build();
try {
rpcProxy.startLogSegment(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import com.google.protobuf.ByteString;
/**
* Utilities for converting protobuf classes to and from
* implementation classes.
*/
class PBHelper {
private PBHelper() {
/** Hidden constructor */
}
public static ByteString getByteString(byte[] bytes) {
return ByteString.copyFrom(bytes);
}
public static NamenodeRole convert(NamenodeRoleProto role) {
switch (role) {
case NAMENODE:
return NamenodeRole.NAMENODE;
case BACKUP:
return NamenodeRole.BACKUP;
case CHECKPOINT:
return NamenodeRole.CHECKPOINT;
}
return null;
}
public static NamenodeRoleProto convert(NamenodeRole role) {
switch (role) {
case NAMENODE:
return NamenodeRoleProto.NAMENODE;
case BACKUP:
return NamenodeRoleProto.BACKUP;
case CHECKPOINT:
return NamenodeRoleProto.CHECKPOINT;
}
return null;
}
public static StorageInfoProto convert(StorageInfo info) {
return StorageInfoProto.newBuilder().setClusterID(info.getClusterID())
.setCTime(info.getCTime())
.setLayoutVersion(info.getLayoutVersion())
.setNamespceID(info.getNamespaceID())
.build();
}
public static StorageInfo convert(StorageInfoProto info) {
return new StorageInfo(info.getLayoutVersion(), info.getNamespceID(),
info.getClusterID(), info.getCTime());
}
public static NamenodeRegistrationProto convert(NamenodeRegistration reg) {
return NamenodeRegistrationProto.newBuilder()
.setHttpAddress(reg.getHttpAddress())
.setRole(convert(reg.getRole()))
.setRpcAddress(reg.getAddress())
.setStorageInfo(convert((StorageInfo) reg)).build();
}
public static NamenodeRegistration convert(NamenodeRegistrationProto reg) {
return new NamenodeRegistration(reg.getRpcAddress(), reg.getHttpAddress(),
convert(reg.getStorageInfo()), convert(reg.getRole()));
}
}

View File

@ -0,0 +1,62 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<title>Protocol Buffers based data types for NN protocols</title>
</head>
<body>
<p>
The Protocol Buffers data types for NN protocols that use PB go in this package.
</p>
<h1>Steps to add a new protocol</h1>
<hr/>
<ol>
<li>Define the protobuf service for the protocol in &lt;ProtocolName&gt;.proto class.
<ul>
<li>This file should include both the protobuf service definition and the types
used for request and response. For example see - NamenodeProtocol.proto
<li>The naming convention for the protobuf service is &lt;ProtocolName&gt;Service.
Example: NamenodeProtocolService.
<li>Every RPC method takes a request and returns a response. The request
naming convention is &lt;MethodName&gt;RequestProto. The response naming convention
is &lt;MethodName&gt;ResponseProto.
</ul>
<li>Generate java files from the proto file using protoc tool.
<li>Define server side interface that extends BlockingInterface from the
generated files (Example: NamenodeProtocolService.BlockingInterface)
and VersionedProtocol. See NamenodePBProtocol.java for example.
<li>Define client side translator to translate the client protocol to
protobuf. See NamenodeProtocolTranslator.
<li>Define server side implementation that implements the server side interface.
This implementation receives the protobuf requests and delegates it to the
server side implementation. See NamenodePBProtocolImpl for example.
<li>Make changes to register this protocol at the server. See the other
protocols on how this is done.
</ol>
<h1>Steps to make changes to the existing protocol in a compatible way</h1>
<hr/>
<ol>
<li>Adding new methods is a compatible change.</li>
<li>When modifying an existing method, do not change the required parameters
to optional or optional parameters to required. Only add optional parameters
to the request and response.</li>
<li>When modifying an existing type, do not change the required parameters
to optional or optional parameters to require and optional parameters to
required. Only add optional parameters to the request and response.</li>
</ol>

View File

@ -1,29 +0,0 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<title>Protocol Buffers based data types for NN protocols</title>
</head>
<body>
<p>
The Protocol Buffers data types for NN protocols that use
PB go in this package.
</p>

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import static junit.framework.Assert.*;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.junit.Test;
/**
* Tests for {@link PBHelper}
*/
public class TestPBHelper {
@Test
public void testConvertNamenodeRole() {
assertEquals(NamenodeRoleProto.BACKUP,
PBHelper.convert(NamenodeRole.BACKUP));
assertEquals(NamenodeRoleProto.CHECKPOINT,
PBHelper.convert(NamenodeRole.CHECKPOINT));
assertEquals(NamenodeRoleProto.NAMENODE,
PBHelper.convert(NamenodeRole.NAMENODE));
assertEquals(NamenodeRole.BACKUP,
PBHelper.convert(NamenodeRoleProto.BACKUP));
assertEquals(NamenodeRole.CHECKPOINT,
PBHelper.convert(NamenodeRoleProto.CHECKPOINT));
assertEquals(NamenodeRole.NAMENODE,
PBHelper.convert(NamenodeRoleProto.NAMENODE));
}
@Test
public void testConvertStoragInfo() {
StorageInfo info = new StorageInfo(1, 2, "cid", 3);
StorageInfoProto infoProto = PBHelper.convert(info);
StorageInfo info2 = PBHelper.convert(infoProto);
assertEquals(info.getClusterID(), info2.getClusterID());
assertEquals(info.getCTime(), info2.getCTime());
assertEquals(info.getLayoutVersion(), info2.getLayoutVersion());
assertEquals(info.getNamespaceID(), info2.getNamespaceID());
}
@Test
public void testConvertNamenodeRegistration() {
StorageInfo info = new StorageInfo(1, 2, "cid", 3);
NamenodeRegistration reg = new NamenodeRegistration("address:999",
"http:1000", info, NamenodeRole.NAMENODE);
NamenodeRegistrationProto regProto = PBHelper.convert(reg);
NamenodeRegistration reg2 = PBHelper.convert(regProto);
assertEquals(reg.getAddress(), reg2.getAddress());
assertEquals(reg.getClusterID(), reg2.getClusterID());
assertEquals(reg.getCTime(), reg2.getCTime());
assertEquals(reg.getHttpAddress(), reg2.getHttpAddress());
assertEquals(reg.getLayoutVersion(), reg2.getLayoutVersion());
assertEquals(reg.getNamespaceID(), reg2.getNamespaceID());
assertEquals(reg.getRegistrationID(), reg2.getRegistrationID());
assertEquals(reg.getRole(), reg2.getRole());
assertEquals(reg.getVersion(), reg2.getVersion());
}
}