From 94b368f29fb5286253f4e5cac2d30b61cb62a7e5 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Mon, 17 Dec 2018 15:40:22 -0800 Subject: [PATCH] HDDS-99. Adding SCM Audit log. Contributed by Dinesh Chitlangia. --- .../apache/hadoop/ozone/audit/SCMAction.java | 45 ++++ .../hadoop/ozone/common/BlockGroup.java | 8 + .../scm/server/SCMBlockProtocolServer.java | 106 +++++++- .../scm/server/SCMClientProtocolServer.java | 227 ++++++++++++++---- .../scm/server/SCMDatanodeProtocolServer.java | 111 ++++++++- hadoop-ozone/common/src/main/bin/ozone | 1 + .../dev-support/bin/dist-layout-stitching | 1 + .../src/main/conf/scm-audit-log4j2.properties | 90 +++++++ 8 files changed, 533 insertions(+), 56 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java create mode 100644 hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java new file mode 100644 index 0000000000..950a7833a5 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.audit; + +/** + * Enum to define Audit Action types for SCM. + */ +public enum SCMAction implements AuditAction { + + GET_VERSION, + REGISTER, + SEND_HEARTBEAT, + GET_SCM_INFO, + ALLOCATE_BLOCK, + DELETE_KEY_BLOCK, + ALLOCATE_CONTAINER, + GET_CONTAINER, + GET_CONTAINER_WITH_PIPELINE, + LIST_CONTAINER, + LIST_PIPELINE, + CLOSE_PIPELINE, + DELETE_CONTAINER, + IN_CHILL_MODE, + FORCE_EXIT_CHILL_MODE; + + @Override + public String getAction() { + return this.toString(); + } + +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java index 2f6030fdc8..1925c22aa2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java @@ -72,6 +72,14 @@ public static Builder newBuilder() { return new Builder(); } + @Override + public String toString() { + return "BlockGroup[" + + "groupID='" + groupID + '\'' + + ", blockIDs=" + blockIDs + + ']'; + } + /** * BlockGroup instance builder. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 3bb284e8d0..26e3b13a50 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -21,6 +21,7 @@ */ package org.apache.hadoop.hdds.scm.server; +import com.google.common.collect.Maps; import com.google.protobuf.BlockingService; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -35,6 +36,14 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ozone.audit.AuditAction; +import org.apache.hadoop.ozone.audit.AuditEventStatus; +import org.apache.hadoop.ozone.audit.AuditLogger; +import org.apache.hadoop.ozone.audit.AuditLoggerType; +import org.apache.hadoop.ozone.audit.AuditMessage; +import org.apache.hadoop.ozone.audit.Auditor; +import org.apache.hadoop.ozone.audit.SCMAction; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; @@ -47,6 +56,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY; @@ -62,10 +72,14 @@ * SCM block protocol is the protocol used by Namenode and OzoneManager to get * blocks from the SCM. */ -public class SCMBlockProtocolServer implements ScmBlockLocationProtocol { +public class SCMBlockProtocolServer implements + ScmBlockLocationProtocol, Auditor { private static final Logger LOG = LoggerFactory.getLogger(SCMBlockProtocolServer.class); + private static final AuditLogger AUDIT = + new AuditLogger(AuditLoggerType.SCMLOGGER); + private final StorageContainerManager scm; private final OzoneConfiguration conf; private final RPC.Server blockRpcServer; @@ -140,7 +154,27 @@ public void join() throws InterruptedException { public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, String owner) throws IOException { - return scm.getScmBlockManager().allocateBlock(size, type, factor, owner); + Map auditMap = Maps.newHashMap(); + auditMap.put("size", String.valueOf(size)); + auditMap.put("type", type.name()); + auditMap.put("factor", factor.name()); + auditMap.put("owner", owner); + boolean auditSuccess = true; + try { + return scm.getScmBlockManager().allocateBlock(size, type, factor, owner); + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logWriteFailure( + buildAuditMessageForFailure(SCMAction.ALLOCATE_BLOCK, auditMap, ex) + ); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logWriteSuccess( + buildAuditMessageForSuccess(SCMAction.ALLOCATE_BLOCK, auditMap) + ); + } + } } /** @@ -155,17 +189,26 @@ public List deleteKeyBlocks( LOG.info("SCM is informed by OM to delete {} blocks", keyBlocksInfoList .size()); List results = new ArrayList<>(); + Map auditMap = Maps.newHashMap(); for (BlockGroup keyBlocks : keyBlocksInfoList) { ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode; try { // We delete blocks in an atomic operation to prevent getting // into state like only a partial of blocks are deleted, // which will leave key in an inconsistent state. + auditMap.put("keyBlockToDelete", keyBlocks.toString()); scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList()); resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult .Result.success; + AUDIT.logWriteSuccess( + buildAuditMessageForSuccess(SCMAction.DELETE_KEY_BLOCK, auditMap) + ); } catch (SCMException scmEx) { LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx); + AUDIT.logWriteFailure( + buildAuditMessageForFailure(SCMAction.DELETE_KEY_BLOCK, auditMap, + scmEx) + ); switch (scmEx.getResult()) { case CHILL_MODE_EXCEPTION: resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult @@ -182,6 +225,10 @@ public List deleteKeyBlocks( } catch (IOException ex) { LOG.warn("Fail to delete blocks for object key: {}", keyBlocks .getGroupID(), ex); + AUDIT.logWriteFailure( + buildAuditMessageForFailure(SCMAction.DELETE_KEY_BLOCK, auditMap, + ex) + ); resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult .Result.unknownFailure; } @@ -197,10 +244,55 @@ public List deleteKeyBlocks( @Override public ScmInfo getScmInfo() throws IOException { - ScmInfo.Builder builder = - new ScmInfo.Builder() - .setClusterId(scm.getScmStorage().getClusterID()) - .setScmId(scm.getScmStorage().getScmId()); - return builder.build(); + boolean auditSuccess = true; + try{ + ScmInfo.Builder builder = + new ScmInfo.Builder() + .setClusterId(scm.getScmStorage().getClusterID()) + .setScmId(scm.getScmStorage().getScmId()); + return builder.build(); + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logReadFailure( + buildAuditMessageForFailure(SCMAction.GET_SCM_INFO, null, ex) + ); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.GET_SCM_INFO, null) + ); + } + } + } + + @Override + public AuditMessage buildAuditMessageForSuccess( + AuditAction op, Map auditMap) { + return new AuditMessage.Builder() + .setUser((Server.getRemoteUser() == null) ? null : + Server.getRemoteUser().getUserName()) + .atIp((Server.getRemoteIp() == null) ? null : + Server.getRemoteIp().getHostAddress()) + .forOperation(op.getAction()) + .withParams(auditMap) + .withResult(AuditEventStatus.SUCCESS.toString()) + .withException(null) + .build(); + } + + @Override + public AuditMessage buildAuditMessageForFailure(AuditAction op, Map auditMap, Throwable throwable) { + return new AuditMessage.Builder() + .setUser((Server.getRemoteUser() == null) ? null : + Server.getRemoteUser().getUserName()) + .atIp((Server.getRemoteIp() == null) ? null : + Server.getRemoteIp().getHostAddress()) + .forOperation(op.getAction()) + .withParams(auditMap) + .withResult(AuditEventStatus.FAILURE.toString()) + .withException(throwable) + .build(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index d80d6e215d..998512c369 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.google.protobuf.BlockingService; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -52,6 +53,14 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ozone.audit.AuditAction; +import org.apache.hadoop.ozone.audit.AuditEventStatus; +import org.apache.hadoop.ozone.audit.AuditLogger; +import org.apache.hadoop.ozone.audit.AuditLoggerType; +import org.apache.hadoop.ozone.audit.AuditMessage; +import org.apache.hadoop.ozone.audit.Auditor; +import org.apache.hadoop.ozone.audit.SCMAction; import org.apache.hadoop.ozone.protocolPB .StorageContainerLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.security.UserGroupInformation; @@ -62,6 +71,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -84,9 +94,11 @@ * The RPC server that listens to requests from clients. */ public class SCMClientProtocolServer implements - StorageContainerLocationProtocol, EventHandler { + StorageContainerLocationProtocol, EventHandler, Auditor { private static final Logger LOG = LoggerFactory.getLogger(SCMClientProtocolServer.class); + private static final AuditLogger AUDIT = + new AuditLogger(AuditLoggerType.SCMLOGGER); private final RPC.Server clientRpcServer; private final InetSocketAddress clientRpcAddress; private final StorageContainerManager scm; @@ -177,46 +189,84 @@ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType @Override public ContainerInfo getContainer(long containerID) throws IOException { String remoteUser = getRpcRemoteUsername(); + boolean auditSuccess = true; + Map auditMap = Maps.newHashMap(); + auditMap.put("containerID", String.valueOf(containerID)); getScm().checkAdminAccess(remoteUser); - return scm.getContainerManager() - .getContainer(ContainerID.valueof(containerID)); + try { + return scm.getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + } catch (IOException ex) { + auditSuccess = false; + AUDIT.logReadFailure( + buildAuditMessageForFailure(SCMAction.GET_CONTAINER, auditMap, ex) + ); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.GET_CONTAINER, auditMap) + ); + } + } + } @Override public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException { - if (chillModePrecheck.isInChillMode()) { - ContainerInfo contInfo = scm.getContainerManager() - .getContainer(ContainerID.valueof(containerID)); - if (contInfo.isOpen()) { - if (!hasRequiredReplicas(contInfo)) { - throw new SCMException("Open container " + containerID + " doesn't" - + " have enough replicas to service this operation in " - + "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION); + Map auditMap = Maps.newHashMap(); + auditMap.put("containerID", String.valueOf(containerID)); + boolean auditSuccess = true; + try { + if (chillModePrecheck.isInChillMode()) { + ContainerInfo contInfo = scm.getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + if (contInfo.isOpen()) { + if (!hasRequiredReplicas(contInfo)) { + throw new SCMException("Open container " + containerID + " doesn't" + + " have enough replicas to service this operation in " + + "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION); + } } } + getScm().checkAdminAccess(null); + + final ContainerID id = ContainerID.valueof(containerID); + final ContainerInfo container = scm.getContainerManager(). + getContainer(id); + final Pipeline pipeline; + + if (container.isOpen()) { + // Ratis pipeline + pipeline = scm.getPipelineManager() + .getPipeline(container.getPipelineID()); + } else { + pipeline = scm.getPipelineManager().createPipeline( + HddsProtos.ReplicationType.STAND_ALONE, + container.getReplicationFactor(), + scm.getContainerManager() + .getContainerReplicas(id).stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList())); + } + + return new ContainerWithPipeline(container, pipeline); + } catch (IOException ex) { + auditSuccess = false; + AUDIT.logReadFailure( + buildAuditMessageForFailure(SCMAction.GET_CONTAINER_WITH_PIPELINE, + auditMap, ex) + ); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.GET_CONTAINER_WITH_PIPELINE, + auditMap) + ); + } } - getScm().checkAdminAccess(null); - - final ContainerID id = ContainerID.valueof(containerID); - final ContainerInfo container = scm.getContainerManager().getContainer(id); - final Pipeline pipeline; - - if (container.isOpen()) { - // Ratis pipeline - pipeline = scm.getPipelineManager() - .getPipeline(container.getPipelineID()); - } else { - pipeline = scm.getPipelineManager().createPipeline( - HddsProtos.ReplicationType.STAND_ALONE, - container.getReplicationFactor(), - scm.getContainerManager() - .getContainerReplicas(id).stream() - .map(ContainerReplica::getDatanodeDetails) - .collect(Collectors.toList())); - } - - return new ContainerWithPipeline(container, pipeline); } /** @@ -238,16 +288,51 @@ private boolean hasRequiredReplicas(ContainerInfo contInfo) { @Override public List listContainer(long startContainerID, int count) throws IOException { - return scm.getContainerManager(). - listContainer(ContainerID.valueof(startContainerID), count); + boolean auditSuccess = true; + Map auditMap = Maps.newHashMap(); + auditMap.put("startContainerID", String.valueOf(startContainerID)); + auditMap.put("count", String.valueOf(count)); + try { + return scm.getContainerManager(). + listContainer(ContainerID.valueof(startContainerID), count); + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logReadFailure( + buildAuditMessageForFailure(SCMAction.LIST_CONTAINER, auditMap, ex)); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.LIST_CONTAINER, auditMap)); + } + } + } @Override public void deleteContainer(long containerID) throws IOException { String remoteUser = getRpcRemoteUsername(); - getScm().checkAdminAccess(remoteUser); - scm.getContainerManager().deleteContainer(ContainerID.valueof(containerID)); - + boolean auditSuccess = true; + Map auditMap = Maps.newHashMap(); + auditMap.put("containerID", String.valueOf(containerID)); + auditMap.put("remoteUser", remoteUser); + try { + getScm().checkAdminAccess(remoteUser); + scm.getContainerManager().deleteContainer( + ContainerID.valueof(containerID)); + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logWriteFailure( + buildAuditMessageForFailure(SCMAction.DELETE_CONTAINER, auditMap, ex) + ); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logWriteSuccess( + buildAuditMessageForSuccess(SCMAction.DELETE_CONTAINER, auditMap) + ); + } + } } @Override @@ -311,26 +396,48 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, @Override public List listPipelines() { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.LIST_PIPELINE, null)); return scm.getPipelineManager().getPipelines(); } @Override public void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException { + Map auditMap = Maps.newHashMap(); + auditMap.put("pipelineID", pipelineID.getId()); PipelineManager pipelineManager = scm.getPipelineManager(); Pipeline pipeline = pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID)); RatisPipelineUtils .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false); + AUDIT.logWriteSuccess( + buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null) + ); } @Override public ScmInfo getScmInfo() throws IOException { - ScmInfo.Builder builder = - new ScmInfo.Builder() - .setClusterId(scm.getScmStorage().getClusterID()) - .setScmId(scm.getScmStorage().getScmId()); - return builder.build(); + boolean auditSuccess = true; + try{ + ScmInfo.Builder builder = + new ScmInfo.Builder() + .setClusterId(scm.getScmStorage().getClusterID()) + .setScmId(scm.getScmStorage().getScmId()); + return builder.build(); + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logReadFailure( + buildAuditMessageForFailure(SCMAction.GET_SCM_INFO, null, ex) + ); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.GET_SCM_INFO, null) + ); + } + } } /** @@ -341,6 +448,9 @@ public ScmInfo getScmInfo() throws IOException { */ @Override public boolean inChillMode() throws IOException { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.IN_CHILL_MODE, null) + ); return scm.isInChillMode(); } @@ -352,6 +462,9 @@ public boolean inChillMode() throws IOException { */ @Override public boolean forceExitChillMode() throws IOException { + AUDIT.logWriteSuccess( + buildAuditMessageForSuccess(SCMAction.FORCE_EXIT_CHILL_MODE, null) + ); return scm.exitChillMode(); } @@ -409,4 +522,34 @@ private Set queryNodeState(HddsProtos.NodeState nodeState) { } return returnSet; } + + @Override + public AuditMessage buildAuditMessageForSuccess( + AuditAction op, Map auditMap) { + return new AuditMessage.Builder() + .setUser((Server.getRemoteUser() == null) ? null : + Server.getRemoteUser().getUserName()) + .atIp((Server.getRemoteIp() == null) ? null : + Server.getRemoteIp().getHostAddress()) + .forOperation(op.getAction()) + .withParams(auditMap) + .withResult(AuditEventStatus.SUCCESS.toString()) + .withException(null) + .build(); + } + + @Override + public AuditMessage buildAuditMessageForFailure(AuditAction op, Map auditMap, Throwable throwable) { + return new AuditMessage.Builder() + .setUser((Server.getRemoteUser() == null) ? null : + Server.getRemoteUser().getUserName()) + .atIp((Server.getRemoteIp() == null) ? null : + Server.getRemoteIp().getHostAddress()) + .forOperation(op.getAction()) + .withParams(auditMap) + .withResult(AuditEventStatus.FAILURE.toString()) + .withException(throwable) + .build(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 77ef9c849f..6a3552ef63 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.google.protobuf.BlockingService; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -75,6 +76,14 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ozone.audit.AuditAction; +import org.apache.hadoop.ozone.audit.AuditEventStatus; +import org.apache.hadoop.ozone.audit.AuditLogger; +import org.apache.hadoop.ozone.audit.AuditLoggerType; +import org.apache.hadoop.ozone.audit.AuditMessage; +import org.apache.hadoop.ozone.audit.Auditor; +import org.apache.hadoop.ozone.audit.SCMAction; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; @@ -91,6 +100,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; @@ -106,11 +116,14 @@ * Protocol Handler for Datanode Protocol. */ public class SCMDatanodeProtocolServer implements - StorageContainerDatanodeProtocol { + StorageContainerDatanodeProtocol, Auditor { private static final Logger LOG = LoggerFactory.getLogger( SCMDatanodeProtocolServer.class); + private static final AuditLogger AUDIT = + new AuditLogger(AuditLoggerType.SCMLOGGER); + /** * The RPC server that listens to requests from DataNodes. */ @@ -179,8 +192,21 @@ public InetSocketAddress getDatanodeRpcAddress() { public SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest) throws IOException { - return scm.getScmNodeManager().getVersion(versionRequest) - .getProtobufMessage(); + boolean auditSuccess = true; + try { + return scm.getScmNodeManager().getVersion(versionRequest) + .getProtobufMessage(); + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logReadFailure( + buildAuditMessageForFailure(SCMAction.GET_VERSION, null, ex)); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logReadSuccess( + buildAuditMessageForSuccess(SCMAction.GET_VERSION, null)); + } + } } @Override @@ -192,6 +218,10 @@ public SCMRegisteredResponseProto register( throws IOException { DatanodeDetails datanodeDetails = DatanodeDetails .getFromProtoBuf(datanodeDetailsProto); + boolean auditSuccess = true; + Map auditMap = Maps.newHashMap(); + auditMap.put("datanodeDetails", datanodeDetails.toString()); + // TODO : Return the list of Nodes that forms the SCM HA. RegisteredCommand registeredCommand = scm.getScmNodeManager() .register(datanodeDetails, nodeReport, pipelineReportsProto); @@ -207,7 +237,19 @@ public SCMRegisteredResponseProto register( new PipelineReportFromDatanode(datanodeDetails, pipelineReportsProto)); } - return getRegisteredResponse(registeredCommand); + try { + return getRegisteredResponse(registeredCommand); + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logWriteFailure( + buildAuditMessageForFailure(SCMAction.REGISTER, auditMap, ex)); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logWriteSuccess( + buildAuditMessageForSuccess(SCMAction.REGISTER, auditMap)); + } + } } @VisibleForTesting @@ -229,9 +271,27 @@ public SCMHeartbeatResponseProto sendHeartbeat( for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { cmdResponses.add(getCommandResponse(cmd)); } - return SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid()) - .addAllCommands(cmdResponses).build(); + boolean auditSuccess = true; + Map auditMap = Maps.newHashMap(); + auditMap.put("datanodeUUID", heartbeat.getDatanodeDetails().getUuid()); + auditMap.put("command", flatten(cmdResponses.toString())); + try { + return SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid()) + .addAllCommands(cmdResponses).build(); + } catch (Exception ex) { + auditSuccess = false; + AUDIT.logWriteFailure( + buildAuditMessageForFailure(SCMAction.SEND_HEARTBEAT, auditMap, ex) + ); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logWriteSuccess( + buildAuditMessageForSuccess(SCMAction.SEND_HEARTBEAT, auditMap) + ); + } + } } /** @@ -302,6 +362,43 @@ public void stop() { IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); } + @Override + public AuditMessage buildAuditMessageForSuccess( + AuditAction op, Map auditMap) { + return new AuditMessage.Builder() + .setUser((Server.getRemoteUser() == null) ? null : + Server.getRemoteUser().getUserName()) + .atIp((Server.getRemoteIp() == null) ? null : + Server.getRemoteIp().getHostAddress()) + .forOperation(op.getAction()) + .withParams(auditMap) + .withResult(AuditEventStatus.SUCCESS.toString()) + .withException(null) + .build(); + } + + @Override + public AuditMessage buildAuditMessageForFailure(AuditAction op, Map auditMap, Throwable throwable) { + return new AuditMessage.Builder() + .setUser((Server.getRemoteUser() == null) ? null : + Server.getRemoteUser().getUserName()) + .atIp((Server.getRemoteIp() == null) ? null : + Server.getRemoteIp().getHostAddress()) + .forOperation(op.getAction()) + .withParams(auditMap) + .withResult(AuditEventStatus.FAILURE.toString()) + .withException(throwable) + .build(); + } + + private static String flatten(String input) { + return input + .replaceAll(System.lineSeparator(), " ") + .trim() + .replaceAll(" +", " "); + } + /** * Wrapper class for events with the datanode origin. */ diff --git a/hadoop-ozone/common/src/main/bin/ozone b/hadoop-ozone/common/src/main/bin/ozone index e034082725..bea01f0532 100755 --- a/hadoop-ozone/common/src/main/bin/ozone +++ b/hadoop-ozone/common/src/main/bin/ozone @@ -115,6 +115,7 @@ function ozonecmd_case HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" HADOOP_CLASSNAME='org.apache.hadoop.hdds.scm.server.StorageContainerManager' hadoop_debug "Appending HDFS_STORAGECONTAINERMANAGER_OPTS onto HADOOP_OPTS" + HDFS_STORAGECONTAINERMANAGER_OPTS="${HDFS_STORAGECONTAINERMANAGER_OPTS} -Dlog4j.configurationFile=${HADOOP_CONF_DIR}/scm-audit-log4j2.properties" HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_STORAGECONTAINERMANAGER_OPTS}" OZONE_RUN_ARTIFACT_NAME="hadoop-hdds-server-scm" ;; diff --git a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching index 55927a78e7..66181fd4db 100755 --- a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching +++ b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching @@ -87,6 +87,7 @@ run mkdir -p ./libexec run cp -r "${ROOT}/hadoop-common-project/hadoop-common/src/main/conf" "etc/hadoop" run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/om-audit-log4j2.properties" "etc/hadoop" run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/dn-audit-log4j2.properties" "etc/hadoop" +run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties" "etc/hadoop" run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/ozone-site.xml" "etc/hadoop" run cp -f "${ROOT}/hadoop-ozone/dist/src/main/conf/log4j.properties" "etc/hadoop" run cp "${ROOT}/hadoop-common-project/hadoop-common/src/main/bin/hadoop" "bin/" diff --git a/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties b/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties new file mode 100644 index 0000000000..3f81561cc4 --- /dev/null +++ b/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +#

+# http://www.apache.org/licenses/LICENSE-2.0 +#

+# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +# +name=PropertiesConfig + +# Checks for config change periodically and reloads +monitorInterval=30 + +filter=read,write +# filter.read.onMatch=DENY avoids logging all READ events +# filter.read.onMatch=ACCEPT permits logging all READ events +# The above two settings ignore the log levels in configuration +# filter.read.onMatch=NEUTRAL permits logging of only those READ events +# which are attempted at log level equal or greater than log level specified +# in the configuration +filter.read.type=MarkerFilter +filter.read.marker=READ +filter.read.onMatch=DENY +filter.read.onMismatch=NEUTRAL + +# filter.write.onMatch=DENY avoids logging all WRITE events +# filter.write.onMatch=ACCEPT permits logging all WRITE events +# The above two settings ignore the log levels in configuration +# filter.write.onMatch=NEUTRAL permits logging of only those WRITE events +# which are attempted at log level equal or greater than log level specified +# in the configuration +filter.write.type=MarkerFilter +filter.write.marker=WRITE +filter.write.onMatch=NEUTRAL +filter.write.onMismatch=NEUTRAL + +# Log Levels are organized from most specific to least: +# OFF (most specific, no logging) +# FATAL (most specific, little data) +# ERROR +# WARN +# INFO +# DEBUG +# TRACE (least specific, a lot of data) +# ALL (least specific, all data) + +# Uncomment following section to enable logging to console appender also +#appenders=console, rolling +#appender.console.type=Console +#appender.console.name=STDOUT +#appender.console.layout.type=PatternLayout +#appender.console.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n + +# Comment this line when using both console and rolling appenders +appenders=rolling + +#Rolling File Appender with size & time thresholds. +#Rolling is triggered when either threshold is breached. +#The rolled over file is compressed by default +#Time interval is specified in seconds 86400s=1 day +appender.rolling.type=RollingFile +appender.rolling.name=RollingFile +appender.rolling.fileName =${sys:hadoop.log.dir}/scm-audit-${hostName}.log +appender.rolling.filePattern=${sys:hadoop.log.dir}/scm-audit-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=86400 +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=64MB + +loggers=audit +logger.audit.type=AsyncLogger +logger.audit.name=SCMAudit +logger.audit.level=INFO +logger.audit.appenderRefs=rolling +logger.audit.appenderRef.file.ref=RollingFile + +rootLogger.level=INFO +#rootLogger.appenderRefs=stdout +#rootLogger.appenderRef.stdout.ref=STDOUT