HADOOP-18183. s3a audit logs to publish range start/end of GET requests. (#5110)
The start and end of the range is set in a new audit param "rg", e.g "?rg=100-200" Contributed by Ankit Saurabh
This commit is contained in:
parent
6202348502
commit
654082773c
@ -90,6 +90,11 @@ private AuditConstants() {
|
|||||||
*/
|
*/
|
||||||
public static final String PARAM_PROCESS = "ps";
|
public static final String PARAM_PROCESS = "ps";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Header: Range for GET request data: {@value}.
|
||||||
|
*/
|
||||||
|
public static final String PARAM_RANGE = "rg";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Task Attempt ID query header: {@value}.
|
* Task Attempt ID query header: {@value}.
|
||||||
*/
|
*/
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import com.amazonaws.AmazonWebServiceRequest;
|
import com.amazonaws.AmazonWebServiceRequest;
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -35,6 +36,7 @@
|
|||||||
import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer;
|
import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer;
|
||||||
import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
|
import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
|
||||||
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
|
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
|
||||||
|
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
||||||
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
|
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
@ -110,6 +112,14 @@ public class LoggingAuditor
|
|||||||
*/
|
*/
|
||||||
private Collection<String> filters;
|
private Collection<String> filters;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Log for warning of problems getting the range of GetObjectRequest
|
||||||
|
* will only log of a problem once per process instance.
|
||||||
|
* This is to avoid logs being flooded with errors.
|
||||||
|
*/
|
||||||
|
private static final LogExactlyOnce WARN_INCORRECT_RANGE =
|
||||||
|
new LogExactlyOnce(LOG);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the auditor.
|
* Create the auditor.
|
||||||
* The UGI current user is used to provide the principal;
|
* The UGI current user is used to provide the principal;
|
||||||
@ -230,6 +240,26 @@ private class LoggingAuditSpan extends AbstractAuditSpanImpl {
|
|||||||
|
|
||||||
private final HttpReferrerAuditHeader referrer;
|
private final HttpReferrerAuditHeader referrer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attach Range of data for GetObject Request.
|
||||||
|
* @param request given get object request
|
||||||
|
*/
|
||||||
|
private void attachRangeFromRequest(AmazonWebServiceRequest request) {
|
||||||
|
if (request instanceof GetObjectRequest) {
|
||||||
|
long[] rangeValue = ((GetObjectRequest) request).getRange();
|
||||||
|
if (rangeValue == null || rangeValue.length == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (rangeValue.length != 2) {
|
||||||
|
WARN_INCORRECT_RANGE.warn("Expected range to contain 0 or 2 elements."
|
||||||
|
+ " Got {} elements. Ignoring.", rangeValue.length);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String combinedRangeValue = String.format("%d-%d", rangeValue[0], rangeValue[1]);
|
||||||
|
referrer.set(AuditConstants.PARAM_RANGE, combinedRangeValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final String description;
|
private final String description;
|
||||||
|
|
||||||
private LoggingAuditSpan(
|
private LoggingAuditSpan(
|
||||||
@ -314,6 +344,8 @@ public void set(final String key, final String value) {
|
|||||||
@Override
|
@Override
|
||||||
public <T extends AmazonWebServiceRequest> T beforeExecution(
|
public <T extends AmazonWebServiceRequest> T beforeExecution(
|
||||||
final T request) {
|
final T request) {
|
||||||
|
// attach range for GetObject requests
|
||||||
|
attachRangeFromRequest(request);
|
||||||
// build the referrer header
|
// build the referrer header
|
||||||
final String header = referrer.buildHttpReferrer();
|
final String header = referrer.buildHttpReferrer();
|
||||||
// update the outer class's field.
|
// update the outer class's field.
|
||||||
|
@ -232,6 +232,7 @@ If any of the field values were `null`, the field is omitted.
|
|||||||
| `p2` | Path 2 of operation | `s3a://alice-london/path2` |
|
| `p2` | Path 2 of operation | `s3a://alice-london/path2` |
|
||||||
| `pr` | Principal | `alice` |
|
| `pr` | Principal | `alice` |
|
||||||
| `ps` | Unique process UUID | `235865a0-d399-4696-9978-64568db1b51c` |
|
| `ps` | Unique process UUID | `235865a0-d399-4696-9978-64568db1b51c` |
|
||||||
|
| `rg` | GET request range | `100-200` |
|
||||||
| `ta` | Task Attempt ID (S3A committer) | |
|
| `ta` | Task Attempt ID (S3A committer) | |
|
||||||
| `t0` | Thread 0: thread span was created in | `100` |
|
| `t0` | Thread 0: thread span was created in | `100` |
|
||||||
| `t1` | Thread 1: thread this operation was executed in | `200` |
|
| `t1` | Thread 1: thread this operation was executed in | `200` |
|
||||||
|
@ -20,8 +20,10 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -138,6 +140,17 @@ protected GetObjectMetadataRequest head() {
|
|||||||
requestFactory.newGetObjectMetadataRequest("/"));
|
requestFactory.newGetObjectMetadataRequest("/"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a GetObject request and modify it before passing it through auditor.
|
||||||
|
* @param modifyRequest Consumer Interface for changing the request before passing to the auditor
|
||||||
|
* @return the request
|
||||||
|
*/
|
||||||
|
protected GetObjectRequest get(Consumer<GetObjectRequest> modifyRequest) {
|
||||||
|
GetObjectRequest req = requestFactory.newGetObjectRequest("/");
|
||||||
|
modifyRequest.accept(req);
|
||||||
|
return manager.beforeExecution(req);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert a head request fails as there is no
|
* Assert a head request fails as there is no
|
||||||
* active span.
|
* active span.
|
||||||
@ -210,4 +223,15 @@ protected void assertMapContains(final Map<String, String> params,
|
|||||||
.isEqualTo(expected);
|
.isEqualTo(expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert the map does not contain the key, i.e, it is null.
|
||||||
|
* @param params map of params
|
||||||
|
* @param key key
|
||||||
|
*/
|
||||||
|
protected void assertMapNotContains(final Map<String, String> params, final String key) {
|
||||||
|
assertThat(params.get(key))
|
||||||
|
.describedAs(key)
|
||||||
|
.isNull();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -46,6 +47,7 @@
|
|||||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PATH;
|
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PATH;
|
||||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PATH2;
|
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PATH2;
|
||||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PRINCIPAL;
|
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PRINCIPAL;
|
||||||
|
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_RANGE;
|
||||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD0;
|
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD0;
|
||||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD1;
|
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD1;
|
||||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_TIMESTAMP;
|
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_TIMESTAMP;
|
||||||
@ -115,6 +117,7 @@ public void testHttpReferrerPatchesTheRequest() throws Throwable {
|
|||||||
assertThat(span.getTimestamp())
|
assertThat(span.getTimestamp())
|
||||||
.describedAs("Timestamp of " + span)
|
.describedAs("Timestamp of " + span)
|
||||||
.isEqualTo(ts);
|
.isEqualTo(ts);
|
||||||
|
assertMapNotContains(params, PARAM_RANGE);
|
||||||
|
|
||||||
assertMapContains(params, PARAM_TIMESTAMP,
|
assertMapContains(params, PARAM_TIMESTAMP,
|
||||||
Long.toString(ts));
|
Long.toString(ts));
|
||||||
@ -309,6 +312,44 @@ public void testStripWrappedQuotes() throws Throwable {
|
|||||||
expectStrippedField("\"\"\"b\"", "b");
|
expectStrippedField("\"\"\"b\"", "b");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that correct range is getting published in header.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetObjectRange() throws Throwable {
|
||||||
|
AuditSpan span = span();
|
||||||
|
GetObjectRequest request = get(getObjectRequest -> getObjectRequest.setRange(100, 200));
|
||||||
|
Map<String, String> headers
|
||||||
|
= request.getCustomRequestHeaders();
|
||||||
|
assertThat(headers)
|
||||||
|
.describedAs("Custom headers")
|
||||||
|
.containsKey(HEADER_REFERRER);
|
||||||
|
String header = headers.get(HEADER_REFERRER);
|
||||||
|
LOG.info("Header is {}", header);
|
||||||
|
Map<String, String> params
|
||||||
|
= HttpReferrerAuditHeader.extractQueryParameters(header);
|
||||||
|
assertMapContains(params, PARAM_RANGE, "100-200");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that no range is getting added to the header in request without range.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetObjectWithoutRange() throws Throwable {
|
||||||
|
AuditSpan span = span();
|
||||||
|
GetObjectRequest request = get(getObjectRequest -> {});
|
||||||
|
Map<String, String> headers
|
||||||
|
= request.getCustomRequestHeaders();
|
||||||
|
assertThat(headers)
|
||||||
|
.describedAs("Custom headers")
|
||||||
|
.containsKey(HEADER_REFERRER);
|
||||||
|
String header = headers.get(HEADER_REFERRER);
|
||||||
|
LOG.info("Header is {}", header);
|
||||||
|
Map<String, String> params
|
||||||
|
= HttpReferrerAuditHeader.extractQueryParameters(header);
|
||||||
|
assertMapNotContains(params, PARAM_RANGE);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Expect a field with quote stripping to match the expected value.
|
* Expect a field with quote stripping to match the expected value.
|
||||||
* @param str string to strip
|
* @param str string to strip
|
||||||
|
Loading…
Reference in New Issue
Block a user