diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
new file mode 100644
index 0000000000..950a7833a5
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.audit;
+
+/**
+ * Enum to define Audit Action types for SCM.
+ */
+public enum SCMAction implements AuditAction {
+
+ GET_VERSION,
+ REGISTER,
+ SEND_HEARTBEAT,
+ GET_SCM_INFO,
+ ALLOCATE_BLOCK,
+ DELETE_KEY_BLOCK,
+ ALLOCATE_CONTAINER,
+ GET_CONTAINER,
+ GET_CONTAINER_WITH_PIPELINE,
+ LIST_CONTAINER,
+ LIST_PIPELINE,
+ CLOSE_PIPELINE,
+ DELETE_CONTAINER,
+ IN_CHILL_MODE,
+ FORCE_EXIT_CHILL_MODE;
+
+ @Override
+ public String getAction() {
+ return this.toString();
+ }
+
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
index 2f6030fdc8..1925c22aa2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
@@ -72,6 +72,14 @@ public static Builder newBuilder() {
return new Builder();
}
+ @Override
+ public String toString() {
+ return "BlockGroup[" +
+ "groupID='" + groupID + '\'' +
+ ", blockIDs=" + blockIDs +
+ ']';
+ }
+
/**
* BlockGroup instance builder.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 3bb284e8d0..26e3b13a50 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -21,6 +21,7 @@
*/
package org.apache.hadoop.hdds.scm.server;
+import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -35,6 +36,14 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.audit.AuditAction;
+import org.apache.hadoop.ozone.audit.AuditEventStatus;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditLoggerType;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.audit.Auditor;
+import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@@ -47,6 +56,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
@@ -62,10 +72,14 @@
* SCM block protocol is the protocol used by Namenode and OzoneManager to get
* blocks from the SCM.
*/
-public class SCMBlockProtocolServer implements ScmBlockLocationProtocol {
+public class SCMBlockProtocolServer implements
+ ScmBlockLocationProtocol, Auditor {
private static final Logger LOG =
LoggerFactory.getLogger(SCMBlockProtocolServer.class);
+ private static final AuditLogger AUDIT =
+ new AuditLogger(AuditLoggerType.SCMLOGGER);
+
private final StorageContainerManager scm;
private final OzoneConfiguration conf;
private final RPC.Server blockRpcServer;
@@ -140,7 +154,27 @@ public void join() throws InterruptedException {
public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType
type, HddsProtos.ReplicationFactor factor, String owner) throws
IOException {
- return scm.getScmBlockManager().allocateBlock(size, type, factor, owner);
+ Map auditMap = Maps.newHashMap();
+ auditMap.put("size", String.valueOf(size));
+ auditMap.put("type", type.name());
+ auditMap.put("factor", factor.name());
+ auditMap.put("owner", owner);
+ boolean auditSuccess = true;
+ try {
+ return scm.getScmBlockManager().allocateBlock(size, type, factor, owner);
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(SCMAction.ALLOCATE_BLOCK, auditMap, ex)
+ );
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(SCMAction.ALLOCATE_BLOCK, auditMap)
+ );
+ }
+ }
}
/**
@@ -155,17 +189,26 @@ public List deleteKeyBlocks(
LOG.info("SCM is informed by OM to delete {} blocks", keyBlocksInfoList
.size());
List results = new ArrayList<>();
+ Map auditMap = Maps.newHashMap();
for (BlockGroup keyBlocks : keyBlocksInfoList) {
ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode;
try {
// We delete blocks in an atomic operation to prevent getting
// into state like only a partial of blocks are deleted,
// which will leave key in an inconsistent state.
+ auditMap.put("keyBlockToDelete", keyBlocks.toString());
scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList());
resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
.Result.success;
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(SCMAction.DELETE_KEY_BLOCK, auditMap)
+ );
} catch (SCMException scmEx) {
LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(SCMAction.DELETE_KEY_BLOCK, auditMap,
+ scmEx)
+ );
switch (scmEx.getResult()) {
case CHILL_MODE_EXCEPTION:
resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
@@ -182,6 +225,10 @@ public List deleteKeyBlocks(
} catch (IOException ex) {
LOG.warn("Fail to delete blocks for object key: {}", keyBlocks
.getGroupID(), ex);
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(SCMAction.DELETE_KEY_BLOCK, auditMap,
+ ex)
+ );
resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
.Result.unknownFailure;
}
@@ -197,10 +244,55 @@ public List deleteKeyBlocks(
@Override
public ScmInfo getScmInfo() throws IOException {
- ScmInfo.Builder builder =
- new ScmInfo.Builder()
- .setClusterId(scm.getScmStorage().getClusterID())
- .setScmId(scm.getScmStorage().getScmId());
- return builder.build();
+ boolean auditSuccess = true;
+ try{
+ ScmInfo.Builder builder =
+ new ScmInfo.Builder()
+ .setClusterId(scm.getScmStorage().getClusterID())
+ .setScmId(scm.getScmStorage().getScmId());
+ return builder.build();
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(SCMAction.GET_SCM_INFO, null, ex)
+ );
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.GET_SCM_INFO, null)
+ );
+ }
+ }
+ }
+
+ @Override
+ public AuditMessage buildAuditMessageForSuccess(
+ AuditAction op, Map auditMap) {
+ return new AuditMessage.Builder()
+ .setUser((Server.getRemoteUser() == null) ? null :
+ Server.getRemoteUser().getUserName())
+ .atIp((Server.getRemoteIp() == null) ? null :
+ Server.getRemoteIp().getHostAddress())
+ .forOperation(op.getAction())
+ .withParams(auditMap)
+ .withResult(AuditEventStatus.SUCCESS.toString())
+ .withException(null)
+ .build();
+ }
+
+ @Override
+ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map auditMap, Throwable throwable) {
+ return new AuditMessage.Builder()
+ .setUser((Server.getRemoteUser() == null) ? null :
+ Server.getRemoteUser().getUserName())
+ .atIp((Server.getRemoteIp() == null) ? null :
+ Server.getRemoteIp().getHostAddress())
+ .forOperation(op.getAction())
+ .withParams(auditMap)
+ .withResult(AuditEventStatus.FAILURE.toString())
+ .withException(throwable)
+ .build();
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index d80d6e215d..998512c369 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -52,6 +53,14 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.audit.AuditAction;
+import org.apache.hadoop.ozone.audit.AuditEventStatus;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditLoggerType;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.audit.Auditor;
+import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.UserGroupInformation;
@@ -62,6 +71,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -84,9 +94,11 @@
* The RPC server that listens to requests from clients.
*/
public class SCMClientProtocolServer implements
- StorageContainerLocationProtocol, EventHandler {
+ StorageContainerLocationProtocol, EventHandler, Auditor {
private static final Logger LOG =
LoggerFactory.getLogger(SCMClientProtocolServer.class);
+ private static final AuditLogger AUDIT =
+ new AuditLogger(AuditLoggerType.SCMLOGGER);
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
private final StorageContainerManager scm;
@@ -177,46 +189,84 @@ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
@Override
public ContainerInfo getContainer(long containerID) throws IOException {
String remoteUser = getRpcRemoteUsername();
+ boolean auditSuccess = true;
+ Map auditMap = Maps.newHashMap();
+ auditMap.put("containerID", String.valueOf(containerID));
getScm().checkAdminAccess(remoteUser);
- return scm.getContainerManager()
- .getContainer(ContainerID.valueof(containerID));
+ try {
+ return scm.getContainerManager()
+ .getContainer(ContainerID.valueof(containerID));
+ } catch (IOException ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(SCMAction.GET_CONTAINER, auditMap, ex)
+ );
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.GET_CONTAINER, auditMap)
+ );
+ }
+ }
+
}
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerID)
throws IOException {
- if (chillModePrecheck.isInChillMode()) {
- ContainerInfo contInfo = scm.getContainerManager()
- .getContainer(ContainerID.valueof(containerID));
- if (contInfo.isOpen()) {
- if (!hasRequiredReplicas(contInfo)) {
- throw new SCMException("Open container " + containerID + " doesn't"
- + " have enough replicas to service this operation in "
- + "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION);
+ Map auditMap = Maps.newHashMap();
+ auditMap.put("containerID", String.valueOf(containerID));
+ boolean auditSuccess = true;
+ try {
+ if (chillModePrecheck.isInChillMode()) {
+ ContainerInfo contInfo = scm.getContainerManager()
+ .getContainer(ContainerID.valueof(containerID));
+ if (contInfo.isOpen()) {
+ if (!hasRequiredReplicas(contInfo)) {
+ throw new SCMException("Open container " + containerID + " doesn't"
+ + " have enough replicas to service this operation in "
+ + "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION);
+ }
}
}
+ getScm().checkAdminAccess(null);
+
+ final ContainerID id = ContainerID.valueof(containerID);
+ final ContainerInfo container = scm.getContainerManager().
+ getContainer(id);
+ final Pipeline pipeline;
+
+ if (container.isOpen()) {
+ // Ratis pipeline
+ pipeline = scm.getPipelineManager()
+ .getPipeline(container.getPipelineID());
+ } else {
+ pipeline = scm.getPipelineManager().createPipeline(
+ HddsProtos.ReplicationType.STAND_ALONE,
+ container.getReplicationFactor(),
+ scm.getContainerManager()
+ .getContainerReplicas(id).stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList()));
+ }
+
+ return new ContainerWithPipeline(container, pipeline);
+ } catch (IOException ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(SCMAction.GET_CONTAINER_WITH_PIPELINE,
+ auditMap, ex)
+ );
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.GET_CONTAINER_WITH_PIPELINE,
+ auditMap)
+ );
+ }
}
- getScm().checkAdminAccess(null);
-
- final ContainerID id = ContainerID.valueof(containerID);
- final ContainerInfo container = scm.getContainerManager().getContainer(id);
- final Pipeline pipeline;
-
- if (container.isOpen()) {
- // Ratis pipeline
- pipeline = scm.getPipelineManager()
- .getPipeline(container.getPipelineID());
- } else {
- pipeline = scm.getPipelineManager().createPipeline(
- HddsProtos.ReplicationType.STAND_ALONE,
- container.getReplicationFactor(),
- scm.getContainerManager()
- .getContainerReplicas(id).stream()
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList()));
- }
-
- return new ContainerWithPipeline(container, pipeline);
}
/**
@@ -238,16 +288,51 @@ private boolean hasRequiredReplicas(ContainerInfo contInfo) {
@Override
public List listContainer(long startContainerID,
int count) throws IOException {
- return scm.getContainerManager().
- listContainer(ContainerID.valueof(startContainerID), count);
+ boolean auditSuccess = true;
+ Map auditMap = Maps.newHashMap();
+ auditMap.put("startContainerID", String.valueOf(startContainerID));
+ auditMap.put("count", String.valueOf(count));
+ try {
+ return scm.getContainerManager().
+ listContainer(ContainerID.valueof(startContainerID), count);
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(SCMAction.LIST_CONTAINER, auditMap, ex));
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.LIST_CONTAINER, auditMap));
+ }
+ }
+
}
@Override
public void deleteContainer(long containerID) throws IOException {
String remoteUser = getRpcRemoteUsername();
- getScm().checkAdminAccess(remoteUser);
- scm.getContainerManager().deleteContainer(ContainerID.valueof(containerID));
-
+ boolean auditSuccess = true;
+ Map auditMap = Maps.newHashMap();
+ auditMap.put("containerID", String.valueOf(containerID));
+ auditMap.put("remoteUser", remoteUser);
+ try {
+ getScm().checkAdminAccess(remoteUser);
+ scm.getContainerManager().deleteContainer(
+ ContainerID.valueof(containerID));
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(SCMAction.DELETE_CONTAINER, auditMap, ex)
+ );
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(SCMAction.DELETE_CONTAINER, auditMap)
+ );
+ }
+ }
}
@Override
@@ -311,26 +396,48 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
@Override
public List listPipelines() {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.LIST_PIPELINE, null));
return scm.getPipelineManager().getPipelines();
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
+ Map auditMap = Maps.newHashMap();
+ auditMap.put("pipelineID", pipelineID.getId());
PipelineManager pipelineManager = scm.getPipelineManager();
Pipeline pipeline =
pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID));
RatisPipelineUtils
.finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null)
+ );
}
@Override
public ScmInfo getScmInfo() throws IOException {
- ScmInfo.Builder builder =
- new ScmInfo.Builder()
- .setClusterId(scm.getScmStorage().getClusterID())
- .setScmId(scm.getScmStorage().getScmId());
- return builder.build();
+ boolean auditSuccess = true;
+ try{
+ ScmInfo.Builder builder =
+ new ScmInfo.Builder()
+ .setClusterId(scm.getScmStorage().getClusterID())
+ .setScmId(scm.getScmStorage().getScmId());
+ return builder.build();
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(SCMAction.GET_SCM_INFO, null, ex)
+ );
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.GET_SCM_INFO, null)
+ );
+ }
+ }
}
/**
@@ -341,6 +448,9 @@ public ScmInfo getScmInfo() throws IOException {
*/
@Override
public boolean inChillMode() throws IOException {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.IN_CHILL_MODE, null)
+ );
return scm.isInChillMode();
}
@@ -352,6 +462,9 @@ public boolean inChillMode() throws IOException {
*/
@Override
public boolean forceExitChillMode() throws IOException {
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(SCMAction.FORCE_EXIT_CHILL_MODE, null)
+ );
return scm.exitChillMode();
}
@@ -409,4 +522,34 @@ private Set queryNodeState(HddsProtos.NodeState nodeState) {
}
return returnSet;
}
+
+ @Override
+ public AuditMessage buildAuditMessageForSuccess(
+ AuditAction op, Map auditMap) {
+ return new AuditMessage.Builder()
+ .setUser((Server.getRemoteUser() == null) ? null :
+ Server.getRemoteUser().getUserName())
+ .atIp((Server.getRemoteIp() == null) ? null :
+ Server.getRemoteIp().getHostAddress())
+ .forOperation(op.getAction())
+ .withParams(auditMap)
+ .withResult(AuditEventStatus.SUCCESS.toString())
+ .withException(null)
+ .build();
+ }
+
+ @Override
+ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map auditMap, Throwable throwable) {
+ return new AuditMessage.Builder()
+ .setUser((Server.getRemoteUser() == null) ? null :
+ Server.getRemoteUser().getUserName())
+ .atIp((Server.getRemoteIp() == null) ? null :
+ Server.getRemoteIp().getHostAddress())
+ .forOperation(op.getAction())
+ .withParams(auditMap)
+ .withResult(AuditEventStatus.FAILURE.toString())
+ .withException(throwable)
+ .build();
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 77ef9c849f..6a3552ef63 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -75,6 +76,14 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.audit.AuditAction;
+import org.apache.hadoop.ozone.audit.AuditEventStatus;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditLoggerType;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.audit.Auditor;
+import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
@@ -91,6 +100,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
@@ -106,11 +116,14 @@
* Protocol Handler for Datanode Protocol.
*/
public class SCMDatanodeProtocolServer implements
- StorageContainerDatanodeProtocol {
+ StorageContainerDatanodeProtocol, Auditor {
private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeProtocolServer.class);
+ private static final AuditLogger AUDIT =
+ new AuditLogger(AuditLoggerType.SCMLOGGER);
+
/**
* The RPC server that listens to requests from DataNodes.
*/
@@ -179,8 +192,21 @@ public InetSocketAddress getDatanodeRpcAddress() {
public SCMVersionResponseProto getVersion(SCMVersionRequestProto
versionRequest)
throws IOException {
- return scm.getScmNodeManager().getVersion(versionRequest)
- .getProtobufMessage();
+ boolean auditSuccess = true;
+ try {
+ return scm.getScmNodeManager().getVersion(versionRequest)
+ .getProtobufMessage();
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(SCMAction.GET_VERSION, null, ex));
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.GET_VERSION, null));
+ }
+ }
}
@Override
@@ -192,6 +218,10 @@ public SCMRegisteredResponseProto register(
throws IOException {
DatanodeDetails datanodeDetails = DatanodeDetails
.getFromProtoBuf(datanodeDetailsProto);
+ boolean auditSuccess = true;
+ Map auditMap = Maps.newHashMap();
+ auditMap.put("datanodeDetails", datanodeDetails.toString());
+
// TODO : Return the list of Nodes that forms the SCM HA.
RegisteredCommand registeredCommand = scm.getScmNodeManager()
.register(datanodeDetails, nodeReport, pipelineReportsProto);
@@ -207,7 +237,19 @@ public SCMRegisteredResponseProto register(
new PipelineReportFromDatanode(datanodeDetails,
pipelineReportsProto));
}
- return getRegisteredResponse(registeredCommand);
+ try {
+ return getRegisteredResponse(registeredCommand);
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(SCMAction.REGISTER, auditMap, ex));
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(SCMAction.REGISTER, auditMap));
+ }
+ }
}
@VisibleForTesting
@@ -229,9 +271,27 @@ public SCMHeartbeatResponseProto sendHeartbeat(
for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) {
cmdResponses.add(getCommandResponse(cmd));
}
- return SCMHeartbeatResponseProto.newBuilder()
- .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
- .addAllCommands(cmdResponses).build();
+ boolean auditSuccess = true;
+ Map auditMap = Maps.newHashMap();
+ auditMap.put("datanodeUUID", heartbeat.getDatanodeDetails().getUuid());
+ auditMap.put("command", flatten(cmdResponses.toString()));
+ try {
+ return SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
+ .addAllCommands(cmdResponses).build();
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(SCMAction.SEND_HEARTBEAT, auditMap, ex)
+ );
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(SCMAction.SEND_HEARTBEAT, auditMap)
+ );
+ }
+ }
}
/**
@@ -302,6 +362,43 @@ public void stop() {
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
}
+ @Override
+ public AuditMessage buildAuditMessageForSuccess(
+ AuditAction op, Map auditMap) {
+ return new AuditMessage.Builder()
+ .setUser((Server.getRemoteUser() == null) ? null :
+ Server.getRemoteUser().getUserName())
+ .atIp((Server.getRemoteIp() == null) ? null :
+ Server.getRemoteIp().getHostAddress())
+ .forOperation(op.getAction())
+ .withParams(auditMap)
+ .withResult(AuditEventStatus.SUCCESS.toString())
+ .withException(null)
+ .build();
+ }
+
+ @Override
+ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map auditMap, Throwable throwable) {
+ return new AuditMessage.Builder()
+ .setUser((Server.getRemoteUser() == null) ? null :
+ Server.getRemoteUser().getUserName())
+ .atIp((Server.getRemoteIp() == null) ? null :
+ Server.getRemoteIp().getHostAddress())
+ .forOperation(op.getAction())
+ .withParams(auditMap)
+ .withResult(AuditEventStatus.FAILURE.toString())
+ .withException(throwable)
+ .build();
+ }
+
+ private static String flatten(String input) {
+ return input
+ .replaceAll(System.lineSeparator(), " ")
+ .trim()
+ .replaceAll(" +", " ");
+ }
+
/**
* Wrapper class for events with the datanode origin.
*/
diff --git a/hadoop-ozone/common/src/main/bin/ozone b/hadoop-ozone/common/src/main/bin/ozone
index e034082725..bea01f0532 100755
--- a/hadoop-ozone/common/src/main/bin/ozone
+++ b/hadoop-ozone/common/src/main/bin/ozone
@@ -115,6 +115,7 @@ function ozonecmd_case
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME='org.apache.hadoop.hdds.scm.server.StorageContainerManager'
hadoop_debug "Appending HDFS_STORAGECONTAINERMANAGER_OPTS onto HADOOP_OPTS"
+ HDFS_STORAGECONTAINERMANAGER_OPTS="${HDFS_STORAGECONTAINERMANAGER_OPTS} -Dlog4j.configurationFile=${HADOOP_CONF_DIR}/scm-audit-log4j2.properties"
HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_STORAGECONTAINERMANAGER_OPTS}"
OZONE_RUN_ARTIFACT_NAME="hadoop-hdds-server-scm"
;;
diff --git a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
index 55927a78e7..66181fd4db 100755
--- a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
+++ b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
@@ -87,6 +87,7 @@ run mkdir -p ./libexec
run cp -r "${ROOT}/hadoop-common-project/hadoop-common/src/main/conf" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/om-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/dn-audit-log4j2.properties" "etc/hadoop"
+run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/ozone-site.xml" "etc/hadoop"
run cp -f "${ROOT}/hadoop-ozone/dist/src/main/conf/log4j.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-common-project/hadoop-common/src/main/bin/hadoop" "bin/"
diff --git a/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties b/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties
new file mode 100644
index 0000000000..3f81561cc4
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+name=PropertiesConfig
+
+# Checks for config change periodically and reloads
+monitorInterval=30
+
+filter=read,write
+# filter.read.onMatch=DENY avoids logging all READ events
+# filter.read.onMatch=ACCEPT permits logging all READ events
+# The above two settings ignore the log levels in configuration
+# filter.read.onMatch=NEUTRAL permits logging of only those READ events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.read.type=MarkerFilter
+filter.read.marker=READ
+filter.read.onMatch=DENY
+filter.read.onMismatch=NEUTRAL
+
+# filter.write.onMatch=DENY avoids logging all WRITE events
+# filter.write.onMatch=ACCEPT permits logging all WRITE events
+# The above two settings ignore the log levels in configuration
+# filter.write.onMatch=NEUTRAL permits logging of only those WRITE events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.write.type=MarkerFilter
+filter.write.marker=WRITE
+filter.write.onMatch=NEUTRAL
+filter.write.onMismatch=NEUTRAL
+
+# Log Levels are organized from most specific to least:
+# OFF (most specific, no logging)
+# FATAL (most specific, little data)
+# ERROR
+# WARN
+# INFO
+# DEBUG
+# TRACE (least specific, a lot of data)
+# ALL (least specific, all data)
+
+# Uncomment following section to enable logging to console appender also
+#appenders=console, rolling
+#appender.console.type=Console
+#appender.console.name=STDOUT
+#appender.console.layout.type=PatternLayout
+#appender.console.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
+
+# Comment this line when using both console and rolling appenders
+appenders=rolling
+
+#Rolling File Appender with size & time thresholds.
+#Rolling is triggered when either threshold is breached.
+#The rolled over file is compressed by default
+#Time interval is specified in seconds 86400s=1 day
+appender.rolling.type=RollingFile
+appender.rolling.name=RollingFile
+appender.rolling.fileName =${sys:hadoop.log.dir}/scm-audit-${hostName}.log
+appender.rolling.filePattern=${sys:hadoop.log.dir}/scm-audit-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz
+appender.rolling.layout.type=PatternLayout
+appender.rolling.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
+appender.rolling.policies.type=Policies
+appender.rolling.policies.time.type=TimeBasedTriggeringPolicy
+appender.rolling.policies.time.interval=86400
+appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=64MB
+
+loggers=audit
+logger.audit.type=AsyncLogger
+logger.audit.name=SCMAudit
+logger.audit.level=INFO
+logger.audit.appenderRefs=rolling
+logger.audit.appenderRef.file.ref=RollingFile
+
+rootLogger.level=INFO
+#rootLogger.appenderRefs=stdout
+#rootLogger.appenderRef.stdout.ref=STDOUT