HADOOP-18875. ABFS: Add sendMs and recvMs information for each AbfsHttpOperation by default. (#6008)
Contributed By: Anmol Asrani
This commit is contained in:
parent
5edd21bc85
commit
ababe3d9b0
@ -79,8 +79,6 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|||||||
private int expectedBytesToBeSent;
|
private int expectedBytesToBeSent;
|
||||||
private long bytesReceived;
|
private long bytesReceived;
|
||||||
|
|
||||||
// optional trace enabled metrics
|
|
||||||
private final boolean isTraceEnabled;
|
|
||||||
private long connectionTimeMs;
|
private long connectionTimeMs;
|
||||||
private long sendRequestTimeMs;
|
private long sendRequestTimeMs;
|
||||||
private long recvResponseTimeMs;
|
private long recvResponseTimeMs;
|
||||||
@ -104,7 +102,6 @@ public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
|
|||||||
protected AbfsHttpOperation(final URL url,
|
protected AbfsHttpOperation(final URL url,
|
||||||
final String method,
|
final String method,
|
||||||
final int httpStatus) {
|
final int httpStatus) {
|
||||||
this.isTraceEnabled = LOG.isTraceEnabled();
|
|
||||||
this.url = url;
|
this.url = url;
|
||||||
this.method = method;
|
this.method = method;
|
||||||
this.statusCode = httpStatus;
|
this.statusCode = httpStatus;
|
||||||
@ -188,14 +185,12 @@ public String toString() {
|
|||||||
sb.append(getClientRequestId());
|
sb.append(getClientRequestId());
|
||||||
sb.append(",rid=");
|
sb.append(",rid=");
|
||||||
sb.append(requestId);
|
sb.append(requestId);
|
||||||
if (isTraceEnabled) {
|
|
||||||
sb.append(",connMs=");
|
sb.append(",connMs=");
|
||||||
sb.append(connectionTimeMs);
|
sb.append(connectionTimeMs);
|
||||||
sb.append(",sendMs=");
|
sb.append(",sendMs=");
|
||||||
sb.append(sendRequestTimeMs);
|
sb.append(sendRequestTimeMs);
|
||||||
sb.append(",recvMs=");
|
sb.append(",recvMs=");
|
||||||
sb.append(recvResponseTimeMs);
|
sb.append(recvResponseTimeMs);
|
||||||
}
|
|
||||||
sb.append(",sent=");
|
sb.append(",sent=");
|
||||||
sb.append(bytesSent);
|
sb.append(bytesSent);
|
||||||
sb.append(",recv=");
|
sb.append(",recv=");
|
||||||
@ -218,18 +213,16 @@ public String getLogString() {
|
|||||||
.append(" ci=")
|
.append(" ci=")
|
||||||
.append(getClientRequestId())
|
.append(getClientRequestId())
|
||||||
.append(" ri=")
|
.append(" ri=")
|
||||||
.append(requestId);
|
.append(requestId)
|
||||||
|
|
||||||
if (isTraceEnabled) {
|
.append(" ct=")
|
||||||
sb.append(" ct=")
|
|
||||||
.append(connectionTimeMs)
|
.append(connectionTimeMs)
|
||||||
.append(" st=")
|
.append(" st=")
|
||||||
.append(sendRequestTimeMs)
|
.append(sendRequestTimeMs)
|
||||||
.append(" rt=")
|
.append(" rt=")
|
||||||
.append(recvResponseTimeMs);
|
.append(recvResponseTimeMs)
|
||||||
}
|
|
||||||
|
|
||||||
sb.append(" bs=")
|
.append(" bs=")
|
||||||
.append(bytesSent)
|
.append(bytesSent)
|
||||||
.append(" br=")
|
.append(" br=")
|
||||||
.append(bytesReceived)
|
.append(bytesReceived)
|
||||||
@ -271,7 +264,6 @@ public String getMaskedEncodedUrl() {
|
|||||||
*/
|
*/
|
||||||
public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders)
|
public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.isTraceEnabled = LOG.isTraceEnabled();
|
|
||||||
this.url = url;
|
this.url = url;
|
||||||
this.method = method;
|
this.method = method;
|
||||||
|
|
||||||
@ -319,9 +311,7 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
|
|||||||
// send the request body
|
// send the request body
|
||||||
|
|
||||||
long startTime = 0;
|
long startTime = 0;
|
||||||
if (this.isTraceEnabled) {
|
|
||||||
startTime = System.nanoTime();
|
startTime = System.nanoTime();
|
||||||
}
|
|
||||||
OutputStream outputStream = null;
|
OutputStream outputStream = null;
|
||||||
// Updates the expected bytes to be sent based on length.
|
// Updates the expected bytes to be sent based on length.
|
||||||
this.expectedBytesToBeSent = length;
|
this.expectedBytesToBeSent = length;
|
||||||
@ -360,11 +350,9 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
|
|||||||
if (outputStream != null) {
|
if (outputStream != null) {
|
||||||
outputStream.close();
|
outputStream.close();
|
||||||
}
|
}
|
||||||
if (this.isTraceEnabled) {
|
|
||||||
this.sendRequestTimeMs = elapsedTimeMs(startTime);
|
this.sendRequestTimeMs = elapsedTimeMs(startTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets and processes the HTTP response.
|
* Gets and processes the HTTP response.
|
||||||
@ -379,15 +367,10 @@ public void processResponse(final byte[] buffer, final int offset, final int len
|
|||||||
|
|
||||||
// get the response
|
// get the response
|
||||||
long startTime = 0;
|
long startTime = 0;
|
||||||
if (this.isTraceEnabled) {
|
|
||||||
startTime = System.nanoTime();
|
startTime = System.nanoTime();
|
||||||
}
|
|
||||||
|
|
||||||
this.statusCode = getConnResponseCode();
|
this.statusCode = getConnResponseCode();
|
||||||
|
|
||||||
if (this.isTraceEnabled) {
|
|
||||||
this.recvResponseTimeMs = elapsedTimeMs(startTime);
|
this.recvResponseTimeMs = elapsedTimeMs(startTime);
|
||||||
}
|
|
||||||
|
|
||||||
this.statusDescription = getConnResponseMessage();
|
this.statusDescription = getConnResponseMessage();
|
||||||
|
|
||||||
@ -404,15 +387,11 @@ public void processResponse(final byte[] buffer, final int offset, final int len
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.isTraceEnabled) {
|
|
||||||
startTime = System.nanoTime();
|
startTime = System.nanoTime();
|
||||||
}
|
|
||||||
|
|
||||||
if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) {
|
if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) {
|
||||||
processStorageErrorResponse();
|
processStorageErrorResponse();
|
||||||
if (this.isTraceEnabled) {
|
|
||||||
this.recvResponseTimeMs += elapsedTimeMs(startTime);
|
this.recvResponseTimeMs += elapsedTimeMs(startTime);
|
||||||
}
|
|
||||||
this.bytesReceived = this.connection.getHeaderFieldLong(HttpHeaderConfigurations.CONTENT_LENGTH, 0);
|
this.bytesReceived = this.connection.getHeaderFieldLong(HttpHeaderConfigurations.CONTENT_LENGTH, 0);
|
||||||
} else {
|
} else {
|
||||||
// consume the input stream to release resources
|
// consume the input stream to release resources
|
||||||
@ -454,9 +433,7 @@ public void processResponse(final byte[] buffer, final int offset, final int len
|
|||||||
LOG.debug("IO Error: ", ex);
|
LOG.debug("IO Error: ", ex);
|
||||||
throw ex;
|
throw ex;
|
||||||
} finally {
|
} finally {
|
||||||
if (this.isTraceEnabled) {
|
|
||||||
this.recvResponseTimeMs += elapsedTimeMs(startTime);
|
this.recvResponseTimeMs += elapsedTimeMs(startTime);
|
||||||
}
|
|
||||||
this.bytesReceived = totalBytesRead;
|
this.bytesReceived = totalBytesRead;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -472,9 +449,6 @@ public void setRequestProperty(String key, String value) {
|
|||||||
* @throws IOException if an error occurs.
|
* @throws IOException if an error occurs.
|
||||||
*/
|
*/
|
||||||
private HttpURLConnection openConnection() throws IOException {
|
private HttpURLConnection openConnection() throws IOException {
|
||||||
if (!isTraceEnabled) {
|
|
||||||
return (HttpURLConnection) url.openConnection();
|
|
||||||
}
|
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
return (HttpURLConnection) url.openConnection();
|
return (HttpURLConnection) url.openConnection();
|
||||||
|
@ -113,7 +113,7 @@ public void verifyTrackingForSingletonLatencyRecords() throws Exception {
|
|||||||
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull();
|
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull();
|
||||||
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
|
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
|
||||||
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
|
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
|
||||||
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET"
|
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ s=0 e= ci=[^ ]* ri=[^ ]* ct=[^ ]* st=[^ ]* rt=[^ ]* bs=0 br=0 m=GET"
|
||||||
+ " u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
|
+ " u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,7 +154,7 @@ public void verifyTrackingForAggregateLatencyRecords() throws Exception {
|
|||||||
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
|
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
|
||||||
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
|
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
|
||||||
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ ls=[0-9]+ lc=" + TEST_AGGREGATE_COUNT
|
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ ls=[0-9]+ lc=" + TEST_AGGREGATE_COUNT
|
||||||
+ " s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
|
+ " s=0 e= ci=[^ ]* ri=[^ ]* ct=[^ ]* st=[^ ]* rt=[^ ]* bs=0 br=0 m=GET u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
|
||||||
}
|
}
|
||||||
|
|
||||||
latencyDetails = abfsPerfTracker.getClientLatency();
|
latencyDetails = abfsPerfTracker.getClientLatency();
|
||||||
|
Loading…
Reference in New Issue
Block a user