HDDS-99. Adding SCM Audit log. Contributed by Dinesh Chitlangia.

This commit is contained in:
Xiaoyu Yao 2018-12-17 15:40:22 -08:00
parent 5426653819
commit 94b368f29f
8 changed files with 533 additions and 56 deletions

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.audit;
/**
* Enum to define Audit Action types for SCM.
*/
public enum SCMAction implements AuditAction {
GET_VERSION,
REGISTER,
SEND_HEARTBEAT,
GET_SCM_INFO,
ALLOCATE_BLOCK,
DELETE_KEY_BLOCK,
ALLOCATE_CONTAINER,
GET_CONTAINER,
GET_CONTAINER_WITH_PIPELINE,
LIST_CONTAINER,
LIST_PIPELINE,
CLOSE_PIPELINE,
DELETE_CONTAINER,
IN_CHILL_MODE,
FORCE_EXIT_CHILL_MODE;
@Override
public String getAction() {
return this.toString();
}
}

View File

@ -72,6 +72,14 @@ public static Builder newBuilder() {
return new Builder(); return new Builder();
} }
@Override
public String toString() {
return "BlockGroup[" +
"groupID='" + groupID + '\'' +
", blockIDs=" + blockIDs +
']';
}
/** /**
* BlockGroup instance builder. * BlockGroup instance builder.
*/ */

View File

@ -21,6 +21,7 @@
*/ */
package org.apache.hadoop.hdds.scm.server; package org.apache.hadoop.hdds.scm.server;
import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -35,6 +36,14 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@ -47,6 +56,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY; .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
@ -62,10 +72,14 @@
* SCM block protocol is the protocol used by Namenode and OzoneManager to get * SCM block protocol is the protocol used by Namenode and OzoneManager to get
* blocks from the SCM. * blocks from the SCM.
*/ */
public class SCMBlockProtocolServer implements ScmBlockLocationProtocol { public class SCMBlockProtocolServer implements
ScmBlockLocationProtocol, Auditor {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(SCMBlockProtocolServer.class); LoggerFactory.getLogger(SCMBlockProtocolServer.class);
private static final AuditLogger AUDIT =
new AuditLogger(AuditLoggerType.SCMLOGGER);
private final StorageContainerManager scm; private final StorageContainerManager scm;
private final OzoneConfiguration conf; private final OzoneConfiguration conf;
private final RPC.Server blockRpcServer; private final RPC.Server blockRpcServer;
@ -140,7 +154,27 @@ public void join() throws InterruptedException {
public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType
type, HddsProtos.ReplicationFactor factor, String owner) throws type, HddsProtos.ReplicationFactor factor, String owner) throws
IOException { IOException {
return scm.getScmBlockManager().allocateBlock(size, type, factor, owner); Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("size", String.valueOf(size));
auditMap.put("type", type.name());
auditMap.put("factor", factor.name());
auditMap.put("owner", owner);
boolean auditSuccess = true;
try {
return scm.getScmBlockManager().allocateBlock(size, type, factor, owner);
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logWriteFailure(
buildAuditMessageForFailure(SCMAction.ALLOCATE_BLOCK, auditMap, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.ALLOCATE_BLOCK, auditMap)
);
}
}
} }
/** /**
@ -155,17 +189,26 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
LOG.info("SCM is informed by OM to delete {} blocks", keyBlocksInfoList LOG.info("SCM is informed by OM to delete {} blocks", keyBlocksInfoList
.size()); .size());
List<DeleteBlockGroupResult> results = new ArrayList<>(); List<DeleteBlockGroupResult> results = new ArrayList<>();
Map<String, String> auditMap = Maps.newHashMap();
for (BlockGroup keyBlocks : keyBlocksInfoList) { for (BlockGroup keyBlocks : keyBlocksInfoList) {
ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode; ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode;
try { try {
// We delete blocks in an atomic operation to prevent getting // We delete blocks in an atomic operation to prevent getting
// into state like only a partial of blocks are deleted, // into state like only a partial of blocks are deleted,
// which will leave key in an inconsistent state. // which will leave key in an inconsistent state.
auditMap.put("keyBlockToDelete", keyBlocks.toString());
scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList()); scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList());
resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
.Result.success; .Result.success;
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.DELETE_KEY_BLOCK, auditMap)
);
} catch (SCMException scmEx) { } catch (SCMException scmEx) {
LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx); LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
AUDIT.logWriteFailure(
buildAuditMessageForFailure(SCMAction.DELETE_KEY_BLOCK, auditMap,
scmEx)
);
switch (scmEx.getResult()) { switch (scmEx.getResult()) {
case CHILL_MODE_EXCEPTION: case CHILL_MODE_EXCEPTION:
resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
@ -182,6 +225,10 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
} catch (IOException ex) { } catch (IOException ex) {
LOG.warn("Fail to delete blocks for object key: {}", keyBlocks LOG.warn("Fail to delete blocks for object key: {}", keyBlocks
.getGroupID(), ex); .getGroupID(), ex);
AUDIT.logWriteFailure(
buildAuditMessageForFailure(SCMAction.DELETE_KEY_BLOCK, auditMap,
ex)
);
resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
.Result.unknownFailure; .Result.unknownFailure;
} }
@ -197,10 +244,55 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
@Override @Override
public ScmInfo getScmInfo() throws IOException { public ScmInfo getScmInfo() throws IOException {
ScmInfo.Builder builder = boolean auditSuccess = true;
new ScmInfo.Builder() try{
.setClusterId(scm.getScmStorage().getClusterID()) ScmInfo.Builder builder =
.setScmId(scm.getScmStorage().getScmId()); new ScmInfo.Builder()
return builder.build(); .setClusterId(scm.getScmStorage().getClusterID())
.setScmId(scm.getScmStorage().getScmId());
return builder.build();
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.GET_SCM_INFO, null, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.GET_SCM_INFO, null)
);
}
}
}
@Override
public AuditMessage buildAuditMessageForSuccess(
AuditAction op, Map<String, String> auditMap) {
return new AuditMessage.Builder()
.setUser((Server.getRemoteUser() == null) ? null :
Server.getRemoteUser().getUserName())
.atIp((Server.getRemoteIp() == null) ? null :
Server.getRemoteIp().getHostAddress())
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.SUCCESS.toString())
.withException(null)
.build();
}
@Override
public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String,
String> auditMap, Throwable throwable) {
return new AuditMessage.Builder()
.setUser((Server.getRemoteUser() == null) ? null :
Server.getRemoteUser().getUserName())
.atIp((Server.getRemoteIp() == null) ? null :
Server.getRemoteIp().getHostAddress())
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.FAILURE.toString())
.withException(throwable)
.build();
} }
} }

View File

@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -52,6 +53,14 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.protocolPB import org.apache.hadoop.ozone.protocolPB
.StorageContainerLocationProtocolServerSideTranslatorPB; .StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -62,6 +71,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -84,9 +94,11 @@
* The RPC server that listens to requests from clients. * The RPC server that listens to requests from clients.
*/ */
public class SCMClientProtocolServer implements public class SCMClientProtocolServer implements
StorageContainerLocationProtocol, EventHandler<Boolean> { StorageContainerLocationProtocol, EventHandler<Boolean>, Auditor {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(SCMClientProtocolServer.class); LoggerFactory.getLogger(SCMClientProtocolServer.class);
private static final AuditLogger AUDIT =
new AuditLogger(AuditLoggerType.SCMLOGGER);
private final RPC.Server clientRpcServer; private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress; private final InetSocketAddress clientRpcAddress;
private final StorageContainerManager scm; private final StorageContainerManager scm;
@ -177,46 +189,84 @@ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
@Override @Override
public ContainerInfo getContainer(long containerID) throws IOException { public ContainerInfo getContainer(long containerID) throws IOException {
String remoteUser = getRpcRemoteUsername(); String remoteUser = getRpcRemoteUsername();
boolean auditSuccess = true;
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("containerID", String.valueOf(containerID));
getScm().checkAdminAccess(remoteUser); getScm().checkAdminAccess(remoteUser);
return scm.getContainerManager() try {
.getContainer(ContainerID.valueof(containerID)); return scm.getContainerManager()
.getContainer(ContainerID.valueof(containerID));
} catch (IOException ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.GET_CONTAINER, auditMap, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.GET_CONTAINER, auditMap)
);
}
}
} }
@Override @Override
public ContainerWithPipeline getContainerWithPipeline(long containerID) public ContainerWithPipeline getContainerWithPipeline(long containerID)
throws IOException { throws IOException {
if (chillModePrecheck.isInChillMode()) { Map<String, String> auditMap = Maps.newHashMap();
ContainerInfo contInfo = scm.getContainerManager() auditMap.put("containerID", String.valueOf(containerID));
.getContainer(ContainerID.valueof(containerID)); boolean auditSuccess = true;
if (contInfo.isOpen()) { try {
if (!hasRequiredReplicas(contInfo)) { if (chillModePrecheck.isInChillMode()) {
throw new SCMException("Open container " + containerID + " doesn't" ContainerInfo contInfo = scm.getContainerManager()
+ " have enough replicas to service this operation in " .getContainer(ContainerID.valueof(containerID));
+ "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION); if (contInfo.isOpen()) {
if (!hasRequiredReplicas(contInfo)) {
throw new SCMException("Open container " + containerID + " doesn't"
+ " have enough replicas to service this operation in "
+ "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION);
}
} }
} }
getScm().checkAdminAccess(null);
final ContainerID id = ContainerID.valueof(containerID);
final ContainerInfo container = scm.getContainerManager().
getContainer(id);
final Pipeline pipeline;
if (container.isOpen()) {
// Ratis pipeline
pipeline = scm.getPipelineManager()
.getPipeline(container.getPipelineID());
} else {
pipeline = scm.getPipelineManager().createPipeline(
HddsProtos.ReplicationType.STAND_ALONE,
container.getReplicationFactor(),
scm.getContainerManager()
.getContainerReplicas(id).stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList()));
}
return new ContainerWithPipeline(container, pipeline);
} catch (IOException ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.GET_CONTAINER_WITH_PIPELINE,
auditMap, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.GET_CONTAINER_WITH_PIPELINE,
auditMap)
);
}
} }
getScm().checkAdminAccess(null);
final ContainerID id = ContainerID.valueof(containerID);
final ContainerInfo container = scm.getContainerManager().getContainer(id);
final Pipeline pipeline;
if (container.isOpen()) {
// Ratis pipeline
pipeline = scm.getPipelineManager()
.getPipeline(container.getPipelineID());
} else {
pipeline = scm.getPipelineManager().createPipeline(
HddsProtos.ReplicationType.STAND_ALONE,
container.getReplicationFactor(),
scm.getContainerManager()
.getContainerReplicas(id).stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList()));
}
return new ContainerWithPipeline(container, pipeline);
} }
/** /**
@ -238,16 +288,51 @@ private boolean hasRequiredReplicas(ContainerInfo contInfo) {
@Override @Override
public List<ContainerInfo> listContainer(long startContainerID, public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException { int count) throws IOException {
return scm.getContainerManager(). boolean auditSuccess = true;
listContainer(ContainerID.valueof(startContainerID), count); Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("startContainerID", String.valueOf(startContainerID));
auditMap.put("count", String.valueOf(count));
try {
return scm.getContainerManager().
listContainer(ContainerID.valueof(startContainerID), count);
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.LIST_CONTAINER, auditMap, ex));
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.LIST_CONTAINER, auditMap));
}
}
} }
@Override @Override
public void deleteContainer(long containerID) throws IOException { public void deleteContainer(long containerID) throws IOException {
String remoteUser = getRpcRemoteUsername(); String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser); boolean auditSuccess = true;
scm.getContainerManager().deleteContainer(ContainerID.valueof(containerID)); Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("containerID", String.valueOf(containerID));
auditMap.put("remoteUser", remoteUser);
try {
getScm().checkAdminAccess(remoteUser);
scm.getContainerManager().deleteContainer(
ContainerID.valueof(containerID));
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logWriteFailure(
buildAuditMessageForFailure(SCMAction.DELETE_CONTAINER, auditMap, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.DELETE_CONTAINER, auditMap)
);
}
}
} }
@Override @Override
@ -311,26 +396,48 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
@Override @Override
public List<Pipeline> listPipelines() { public List<Pipeline> listPipelines() {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.LIST_PIPELINE, null));
return scm.getPipelineManager().getPipelines(); return scm.getPipelineManager().getPipelines();
} }
@Override @Override
public void closePipeline(HddsProtos.PipelineID pipelineID) public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException { throws IOException {
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("pipelineID", pipelineID.getId());
PipelineManager pipelineManager = scm.getPipelineManager(); PipelineManager pipelineManager = scm.getPipelineManager();
Pipeline pipeline = Pipeline pipeline =
pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID)); pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID));
RatisPipelineUtils RatisPipelineUtils
.finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false); .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null)
);
} }
@Override @Override
public ScmInfo getScmInfo() throws IOException { public ScmInfo getScmInfo() throws IOException {
ScmInfo.Builder builder = boolean auditSuccess = true;
new ScmInfo.Builder() try{
.setClusterId(scm.getScmStorage().getClusterID()) ScmInfo.Builder builder =
.setScmId(scm.getScmStorage().getScmId()); new ScmInfo.Builder()
return builder.build(); .setClusterId(scm.getScmStorage().getClusterID())
.setScmId(scm.getScmStorage().getScmId());
return builder.build();
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.GET_SCM_INFO, null, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.GET_SCM_INFO, null)
);
}
}
} }
/** /**
@ -341,6 +448,9 @@ public ScmInfo getScmInfo() throws IOException {
*/ */
@Override @Override
public boolean inChillMode() throws IOException { public boolean inChillMode() throws IOException {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.IN_CHILL_MODE, null)
);
return scm.isInChillMode(); return scm.isInChillMode();
} }
@ -352,6 +462,9 @@ public boolean inChillMode() throws IOException {
*/ */
@Override @Override
public boolean forceExitChillMode() throws IOException { public boolean forceExitChillMode() throws IOException {
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.FORCE_EXIT_CHILL_MODE, null)
);
return scm.exitChillMode(); return scm.exitChillMode();
} }
@ -409,4 +522,34 @@ private Set<DatanodeDetails> queryNodeState(HddsProtos.NodeState nodeState) {
} }
return returnSet; return returnSet;
} }
@Override
public AuditMessage buildAuditMessageForSuccess(
AuditAction op, Map<String, String> auditMap) {
return new AuditMessage.Builder()
.setUser((Server.getRemoteUser() == null) ? null :
Server.getRemoteUser().getUserName())
.atIp((Server.getRemoteIp() == null) ? null :
Server.getRemoteIp().getHostAddress())
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.SUCCESS.toString())
.withException(null)
.build();
}
@Override
public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String,
String> auditMap, Throwable throwable) {
return new AuditMessage.Builder()
.setUser((Server.getRemoteUser() == null) ? null :
Server.getRemoteUser().getUserName())
.atIp((Server.getRemoteIp() == null) ? null :
Server.getRemoteIp().getHostAddress())
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.FAILURE.toString())
.withException(throwable)
.build();
}
} }

View File

@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -75,6 +76,14 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
@ -91,6 +100,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
@ -106,11 +116,14 @@
* Protocol Handler for Datanode Protocol. * Protocol Handler for Datanode Protocol.
*/ */
public class SCMDatanodeProtocolServer implements public class SCMDatanodeProtocolServer implements
StorageContainerDatanodeProtocol { StorageContainerDatanodeProtocol, Auditor {
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeProtocolServer.class); SCMDatanodeProtocolServer.class);
private static final AuditLogger AUDIT =
new AuditLogger(AuditLoggerType.SCMLOGGER);
/** /**
* The RPC server that listens to requests from DataNodes. * The RPC server that listens to requests from DataNodes.
*/ */
@ -179,8 +192,21 @@ public InetSocketAddress getDatanodeRpcAddress() {
public SCMVersionResponseProto getVersion(SCMVersionRequestProto public SCMVersionResponseProto getVersion(SCMVersionRequestProto
versionRequest) versionRequest)
throws IOException { throws IOException {
return scm.getScmNodeManager().getVersion(versionRequest) boolean auditSuccess = true;
.getProtobufMessage(); try {
return scm.getScmNodeManager().getVersion(versionRequest)
.getProtobufMessage();
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.GET_VERSION, null, ex));
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.GET_VERSION, null));
}
}
} }
@Override @Override
@ -192,6 +218,10 @@ public SCMRegisteredResponseProto register(
throws IOException { throws IOException {
DatanodeDetails datanodeDetails = DatanodeDetails DatanodeDetails datanodeDetails = DatanodeDetails
.getFromProtoBuf(datanodeDetailsProto); .getFromProtoBuf(datanodeDetailsProto);
boolean auditSuccess = true;
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("datanodeDetails", datanodeDetails.toString());
// TODO : Return the list of Nodes that forms the SCM HA. // TODO : Return the list of Nodes that forms the SCM HA.
RegisteredCommand registeredCommand = scm.getScmNodeManager() RegisteredCommand registeredCommand = scm.getScmNodeManager()
.register(datanodeDetails, nodeReport, pipelineReportsProto); .register(datanodeDetails, nodeReport, pipelineReportsProto);
@ -207,7 +237,19 @@ public SCMRegisteredResponseProto register(
new PipelineReportFromDatanode(datanodeDetails, new PipelineReportFromDatanode(datanodeDetails,
pipelineReportsProto)); pipelineReportsProto));
} }
return getRegisteredResponse(registeredCommand); try {
return getRegisteredResponse(registeredCommand);
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logWriteFailure(
buildAuditMessageForFailure(SCMAction.REGISTER, auditMap, ex));
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.REGISTER, auditMap));
}
}
} }
@VisibleForTesting @VisibleForTesting
@ -229,9 +271,27 @@ public SCMHeartbeatResponseProto sendHeartbeat(
for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) {
cmdResponses.add(getCommandResponse(cmd)); cmdResponses.add(getCommandResponse(cmd));
} }
return SCMHeartbeatResponseProto.newBuilder() boolean auditSuccess = true;
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid()) Map<String, String> auditMap = Maps.newHashMap();
.addAllCommands(cmdResponses).build(); auditMap.put("datanodeUUID", heartbeat.getDatanodeDetails().getUuid());
auditMap.put("command", flatten(cmdResponses.toString()));
try {
return SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
.addAllCommands(cmdResponses).build();
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logWriteFailure(
buildAuditMessageForFailure(SCMAction.SEND_HEARTBEAT, auditMap, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.SEND_HEARTBEAT, auditMap)
);
}
}
} }
/** /**
@ -302,6 +362,43 @@ public void stop() {
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
} }
@Override
public AuditMessage buildAuditMessageForSuccess(
AuditAction op, Map<String, String> auditMap) {
return new AuditMessage.Builder()
.setUser((Server.getRemoteUser() == null) ? null :
Server.getRemoteUser().getUserName())
.atIp((Server.getRemoteIp() == null) ? null :
Server.getRemoteIp().getHostAddress())
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.SUCCESS.toString())
.withException(null)
.build();
}
@Override
public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String,
String> auditMap, Throwable throwable) {
return new AuditMessage.Builder()
.setUser((Server.getRemoteUser() == null) ? null :
Server.getRemoteUser().getUserName())
.atIp((Server.getRemoteIp() == null) ? null :
Server.getRemoteIp().getHostAddress())
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.FAILURE.toString())
.withException(throwable)
.build();
}
private static String flatten(String input) {
return input
.replaceAll(System.lineSeparator(), " ")
.trim()
.replaceAll(" +", " ");
}
/** /**
* Wrapper class for events with the datanode origin. * Wrapper class for events with the datanode origin.
*/ */

View File

@ -115,6 +115,7 @@ function ozonecmd_case
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME='org.apache.hadoop.hdds.scm.server.StorageContainerManager' HADOOP_CLASSNAME='org.apache.hadoop.hdds.scm.server.StorageContainerManager'
hadoop_debug "Appending HDFS_STORAGECONTAINERMANAGER_OPTS onto HADOOP_OPTS" hadoop_debug "Appending HDFS_STORAGECONTAINERMANAGER_OPTS onto HADOOP_OPTS"
HDFS_STORAGECONTAINERMANAGER_OPTS="${HDFS_STORAGECONTAINERMANAGER_OPTS} -Dlog4j.configurationFile=${HADOOP_CONF_DIR}/scm-audit-log4j2.properties"
HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_STORAGECONTAINERMANAGER_OPTS}" HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_STORAGECONTAINERMANAGER_OPTS}"
OZONE_RUN_ARTIFACT_NAME="hadoop-hdds-server-scm" OZONE_RUN_ARTIFACT_NAME="hadoop-hdds-server-scm"
;; ;;

View File

@ -87,6 +87,7 @@ run mkdir -p ./libexec
run cp -r "${ROOT}/hadoop-common-project/hadoop-common/src/main/conf" "etc/hadoop" run cp -r "${ROOT}/hadoop-common-project/hadoop-common/src/main/conf" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/om-audit-log4j2.properties" "etc/hadoop" run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/om-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/dn-audit-log4j2.properties" "etc/hadoop" run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/dn-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/ozone-site.xml" "etc/hadoop" run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/ozone-site.xml" "etc/hadoop"
run cp -f "${ROOT}/hadoop-ozone/dist/src/main/conf/log4j.properties" "etc/hadoop" run cp -f "${ROOT}/hadoop-ozone/dist/src/main/conf/log4j.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-common-project/hadoop-common/src/main/bin/hadoop" "bin/" run cp "${ROOT}/hadoop-common-project/hadoop-common/src/main/bin/hadoop" "bin/"

View File

@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with this
# work for additional information regarding copyright ownership. The ASF
# licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# <p>
# http://www.apache.org/licenses/LICENSE-2.0
# <p>
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
name=PropertiesConfig
# Checks for config change periodically and reloads
monitorInterval=30
filter=read,write
# filter.read.onMatch=DENY avoids logging all READ events
# filter.read.onMatch=ACCEPT permits logging all READ events
# The above two settings ignore the log levels in configuration
# filter.read.onMatch=NEUTRAL permits logging of only those READ events
# which are attempted at log level equal or greater than log level specified
# in the configuration
filter.read.type=MarkerFilter
filter.read.marker=READ
filter.read.onMatch=DENY
filter.read.onMismatch=NEUTRAL
# filter.write.onMatch=DENY avoids logging all WRITE events
# filter.write.onMatch=ACCEPT permits logging all WRITE events
# The above two settings ignore the log levels in configuration
# filter.write.onMatch=NEUTRAL permits logging of only those WRITE events
# which are attempted at log level equal or greater than log level specified
# in the configuration
filter.write.type=MarkerFilter
filter.write.marker=WRITE
filter.write.onMatch=NEUTRAL
filter.write.onMismatch=NEUTRAL
# Log Levels are organized from most specific to least:
# OFF (most specific, no logging)
# FATAL (most specific, little data)
# ERROR
# WARN
# INFO
# DEBUG
# TRACE (least specific, a lot of data)
# ALL (least specific, all data)
# Uncomment following section to enable logging to console appender also
#appenders=console, rolling
#appender.console.type=Console
#appender.console.name=STDOUT
#appender.console.layout.type=PatternLayout
#appender.console.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
# Comment this line when using both console and rolling appenders
appenders=rolling
#Rolling File Appender with size & time thresholds.
#Rolling is triggered when either threshold is breached.
#The rolled over file is compressed by default
#Time interval is specified in seconds 86400s=1 day
appender.rolling.type=RollingFile
appender.rolling.name=RollingFile
appender.rolling.fileName =${sys:hadoop.log.dir}/scm-audit-${hostName}.log
appender.rolling.filePattern=${sys:hadoop.log.dir}/scm-audit-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz
appender.rolling.layout.type=PatternLayout
appender.rolling.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
appender.rolling.policies.type=Policies
appender.rolling.policies.time.type=TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval=86400
appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=64MB
loggers=audit
logger.audit.type=AsyncLogger
logger.audit.name=SCMAudit
logger.audit.level=INFO
logger.audit.appenderRefs=rolling
logger.audit.appenderRef.file.ref=RollingFile
rootLogger.level=INFO
#rootLogger.appenderRefs=stdout
#rootLogger.appenderRef.stdout.ref=STDOUT