From 6347b2253d1b912d1d8d89a4b3d470c596f2c596 Mon Sep 17 00:00:00 2001 From: Hanisha Koneru Date: Mon, 22 Jan 2018 16:02:32 -0800 Subject: [PATCH] HDFS-13023. Journal Sync does not work on a secure cluster. Contributed by Bharat Viswanadham. --- .../src/main/conf/hadoop-policy.xml | 8 ++ .../hadoop/fs/CommonConfigurationKeys.java | 2 + .../ClientDatanodeProtocolTranslatorPB.java | 1 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 1 + .../hadoop/hdfs/HDFSPolicyProvider.java | 4 + .../protocol/InterQJournalProtocol.java | 54 +++++++++++ .../protocolPB/InterQJournalProtocolPB.java | 40 ++++++++ ...JournalProtocolServerSideTranslatorPB.java | 64 +++++++++++++ .../InterQJournalProtocolTranslatorPB.java | 96 +++++++++++++++++++ .../qjournal/server/JournalNodeRpcServer.java | 38 +++++++- .../qjournal/server/JournalNodeSyncer.java | 57 +++++------ .../main/proto/InterQJournalProtocol.proto | 50 ++++++++++ 12 files changed, 386 insertions(+), 29 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto diff --git a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml index d282c5841c..cf3dd1f4ec 100644 --- a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml +++ b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml @@ -123,6 +123,14 @@ JNs when using the QuorumJournalManager for edit logs. + + security.interqjournal.service.protocol.acl + * + ACL for InterQJournalProtocol, used by the JN to + communicate with other JN + + + security.mrhs.client.protocol.acl * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 0da4bbd086..ed15fa43ea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -224,6 +224,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { SECURITY_NAMENODE_PROTOCOL_ACL = "security.namenode.protocol.acl"; public static final String SECURITY_QJOURNAL_SERVICE_PROTOCOL_ACL = "security.qjournal.service.protocol.acl"; + public static final String SECURITY_INTERQJOURNAL_SERVICE_PROTOCOL_ACL = + "security.interqjournal.service.protocol.acl"; public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP = "hadoop.security.token.service.use_ip"; public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 084c594d11..a9622bae7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index fe51071e3a..2094f23cf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -346,6 +346,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> FederationProtocol.proto RouterProtocol.proto AliasMapProtocol.proto + InterQJournalProtocol.proto diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java index edc72c5e8f..e999375775 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -57,6 +58,9 @@ public class HDFSPolicyProvider extends PolicyProvider { NamenodeProtocol.class), new Service(CommonConfigurationKeys.SECURITY_QJOURNAL_SERVICE_PROTOCOL_ACL, QJournalProtocol.class), + new Service( + CommonConfigurationKeys.SECURITY_INTERQJOURNAL_SERVICE_PROTOCOL_ACL, + InterQJournalProtocol.class), new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL, HAServiceProtocol.class), new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java new file mode 100644 index 0000000000..94caeba788 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.qjournal.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.qjournal.server.JournalNode; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto; +import org.apache.hadoop.security.KerberosInfo; + +import java.io.IOException; + +/** + * Protocol used to communicate between {@link JournalNode} for journalsync. + * + * This is responsible for sending edit log manifest. + */ + +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY, + clientPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY) +@InterfaceAudience.Private +public interface InterQJournalProtocol { + + long versionID = 1L; + + /** + * @param jid the journal from which to enumerate edits + * @param sinceTxId the first transaction which the client cares about + * @param inProgressOk whether or not to check the in-progress edit log + * segment + * @return a list of edit log segments since the given transaction ID. + */ + GetEditLogManifestFromJournalResponseProto getEditLogManifestFromJournal( + String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) + throws IOException; + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolPB.java new file mode 100644 index 0000000000..5fdb8eeb11 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolPB.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.qjournal.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.security.KerberosInfo; +/** + * Protocol used to communicate between journal nodes for journal sync. + * Note: This extends the protocolbuffer service based interface to + * add annotations required for security. + */ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY, + clientPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY) +@ProtocolInfo(protocolName = + "org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface InterQJournalProtocolPB extends + InterQJournalProtocolService.BlockingInterface { +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000..15d6387926 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hdfs.qjournal.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto; + +import java.io.IOException; + +/** + * Implementation for protobuf service that forwards requests + * received on {@link InterQJournalProtocolPB} to the + * {@link InterQJournalProtocol} server implementation. + */ +@InterfaceAudience.Private +public class InterQJournalProtocolServerSideTranslatorPB implements + InterQJournalProtocolPB{ + + /* Server side implementation to delegate the requests to. */ + private final InterQJournalProtocol impl; + + public InterQJournalProtocolServerSideTranslatorPB(InterQJournalProtocol + impl) { + this.impl = impl; + } + + @Override + public GetEditLogManifestFromJournalResponseProto + getEditLogManifestFromJournal(RpcController controller, + GetEditLogManifestFromJournalRequestProto + request) throws ServiceException { + try { + return impl.getEditLogManifestFromJournal( + request.getJid().getIdentifier(), + request.hasNameServiceId() ? request.getNameServiceId() : null, + request.getSinceTxId(), + request.getInProgressOk()); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java new file mode 100644 index 0000000000..cdccfca5e8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hdfs.qjournal.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolMetaInterface; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcClientUtil; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This class is the client side translator to translate the requests made on + * {@link InterQJournalProtocol} interfaces to the RPC server implementing + * {@link InterQJournalProtocolPB}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class InterQJournalProtocolTranslatorPB implements ProtocolMetaInterface, + InterQJournalProtocol, Closeable { + + /* RpcController is not used and hence is set to null. */ + private final static RpcController NULL_CONTROLLER = null; + private final InterQJournalProtocolPB rpcProxy; + + public InterQJournalProtocolTranslatorPB(InterQJournalProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + + @Override + public GetEditLogManifestFromJournalResponseProto + getEditLogManifestFromJournal(String jid, String nameServiceId, + long sinceTxId, boolean inProgressOk) + throws IOException { + try { + GetEditLogManifestFromJournalRequestProto.Builder req; + req = GetEditLogManifestFromJournalRequestProto.newBuilder() + .setJid(convertJournalId(jid)) + .setSinceTxId(sinceTxId) + .setInProgressOk(inProgressOk); + if (nameServiceId !=null) { + req.setNameServiceId(nameServiceId); + } + return rpcProxy.getEditLogManifestFromJournal(NULL_CONTROLLER, + req.build() + ); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) { + return QJournalProtocolProtos.JournalIdProto.newBuilder() + .setIdentifier(jid) + .build(); + } + + @Override + public boolean isMethodSupported(String methodName) throws IOException { + return RpcClientUtil.isMethodSupported(rpcProxy, + InterQJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(InterQJournalProtocolPB.class), methodName); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 748a51c65c..6cb933b5e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -26,8 +26,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; @@ -36,6 +40,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; +import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB; +import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB; import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.StorageInfo; @@ -52,7 +58,8 @@ @InterfaceAudience.Private @VisibleForTesting -public class JournalNodeRpcServer implements QJournalProtocol { +public class JournalNodeRpcServer implements QJournalProtocol, + InterQJournalProtocol { private static final int HANDLER_COUNT = 5; private final JournalNode jn; private Server server; @@ -84,6 +91,19 @@ public class JournalNodeRpcServer implements QJournalProtocol { .setVerbose(false) .build(); + + //Adding InterQJournalProtocolPB to server + InterQJournalProtocolServerSideTranslatorPB + qJournalProtocolServerSideTranslatorPB = new + InterQJournalProtocolServerSideTranslatorPB(this); + + BlockingService interQJournalProtocolService = InterQJournalProtocolService + .newReflectiveBlockingService(qJournalProtocolServerSideTranslatorPB); + + DFSUtil.addPBProtocol(confCopy, InterQJournalProtocolPB.class, + interQJournalProtocolService, server); + + // set service-level authorization security policy if (confCopy.getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { @@ -263,4 +283,20 @@ public Long getJournalCTime(String journalId, String nameServiceId) throws IOException { return jn.getJournalCTime(journalId, nameServiceId); } + + @SuppressWarnings("deprecation") + @Override + public GetEditLogManifestFromJournalResponseProto + getEditLogManifestFromJournal(String jid, String nameServiceId, + long sinceTxId, boolean inProgressOk) + throws IOException { + RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId) + .getEditLogManifest(sinceTxId, inProgressOk); + + return GetEditLogManifestFromJournalResponseProto.newBuilder() + .setManifest(PBHelper.convert(manifest)) + .setHttpPort(jn.getBoundHttpAddress().getPort()) + .setFromURL(jn.getHttpServerURI()) + .build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index 490b3ea452..fa47b143cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -20,7 +20,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; @@ -28,19 +27,17 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; -import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos - .JournalIdProto; -import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos - .GetEditLogManifestRequestProto; -import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos - .GetEditLogManifestResponseProto; -import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB; +import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +49,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -69,7 +67,6 @@ public class JournalNodeSyncer { private final Journal journal; private final String jid; private String nameServiceId; - private final JournalIdProto jidProto; private final JNStorage jnStorage; private final Configuration conf; private volatile Daemon syncJournalDaemon; @@ -90,7 +87,6 @@ public class JournalNodeSyncer { this.journal = journal; this.jid = jid; this.nameServiceId = nameServiceId; - this.jidProto = convertJournalId(this.jid); this.jnStorage = journal.getStorage(); this.conf = conf; journalSyncInterval = conf.getLong( @@ -235,7 +231,7 @@ private void syncWithJournalAtIndex(int index) { LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":" + jn.getBoundIpcAddress().getPort() + " with " + otherJNProxies.get(index) + ", journal id: " + jid); - final QJournalProtocolPB jnProxy = otherJNProxies.get(index).jnProxy; + final InterQJournalProtocol jnProxy = otherJNProxies.get(index).jnProxy; if (jnProxy == null) { LOG.error("JournalNode Proxy not found."); return; @@ -249,13 +245,11 @@ private void syncWithJournalAtIndex(int index) { return; } - GetEditLogManifestResponseProto editLogManifest; + GetEditLogManifestFromJournalResponseProto editLogManifest; try { - editLogManifest = jnProxy.getEditLogManifest(null, - GetEditLogManifestRequestProto.newBuilder().setJid(jidProto) - .setSinceTxId(0) - .setInProgressOk(false).build()); - } catch (ServiceException e) { + editLogManifest = jnProxy.getEditLogManifestFromJournal(jid, + nameServiceId, 0, false); + } catch (IOException e) { LOG.error("Could not sync with Journal at " + otherJNProxies.get(journalNodeIndexForSync), e); return; @@ -323,14 +317,8 @@ private List getJournalAddrList(String uriStr) throws Sets.newHashSet(jn.getBoundIpcAddress())); } - private JournalIdProto convertJournalId(String journalId) { - return QJournalProtocolProtos.JournalIdProto.newBuilder() - .setIdentifier(journalId) - .build(); - } - private void getMissingLogSegments(List thisJournalEditLogs, - GetEditLogManifestResponseProto response, + GetEditLogManifestFromJournalResponseProto response, JournalNodeProxy remoteJNproxy) { List otherJournalEditLogs = PBHelper.convert( @@ -497,13 +485,26 @@ private static DataTransferThrottler getThrottler(Configuration conf) { private class JournalNodeProxy { private final InetSocketAddress jnAddr; - private final QJournalProtocolPB jnProxy; + private final InterQJournalProtocol jnProxy; private URL httpServerUrl; JournalNodeProxy(InetSocketAddress jnAddr) throws IOException { + final Configuration confCopy = new Configuration(conf); this.jnAddr = jnAddr; - this.jnProxy = RPC.getProxy(QJournalProtocolPB.class, - RPC.getProtocolVersion(QJournalProtocolPB.class), jnAddr, conf); + this.jnProxy = SecurityUtil.doAsLoginUser( + new PrivilegedExceptionAction() { + @Override + public InterQJournalProtocol run() throws IOException { + RPC.setProtocolEngine(confCopy, InterQJournalProtocolPB.class, + ProtobufRpcEngine.class); + InterQJournalProtocolPB interQJournalProtocolPB = RPC.getProxy( + InterQJournalProtocolPB.class, + RPC.getProtocolVersion(InterQJournalProtocolPB.class), + jnAddr, confCopy); + return new InterQJournalProtocolTranslatorPB( + interQJournalProtocolPB); + } + }); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto new file mode 100644 index 0000000000..8fe9e6941a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and stable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *stable* .proto interface. + */ + +option java_package = "org.apache.hadoop.hdfs.qjournal.protocol"; +option java_outer_classname = "InterQJournalProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.hdfs.qjournal; + +import "HdfsServer.proto"; +import "QJournalProtocol.proto"; + +message GetEditLogManifestFromJournalRequestProto { + required JournalIdProto jid = 1; + required uint64 sinceTxId = 2; // Transaction ID + optional bool inProgressOk = 3 [default = false]; + optional string nameServiceId = 4; +} + +message GetEditLogManifestFromJournalResponseProto { + required RemoteEditLogManifestProto manifest = 1; + required uint32 httpPort = 2; + optional string fromURL = 3; +} + +service InterQJournalProtocolService { + rpc getEditLogManifestFromJournal(GetEditLogManifestFromJournalRequestProto) + returns (GetEditLogManifestFromJournalResponseProto); +} \ No newline at end of file