HADOOP-19280. [ABFS] Initialize client timer only if metric collection is enabled (#7061)
Contributed by Manish Bhatt
This commit is contained in:
parent
a9b7913d56
commit
9aca73481e
@ -131,6 +131,7 @@
|
|||||||
public abstract class AbfsClient implements Closeable {
|
public abstract class AbfsClient implements Closeable {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
|
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
|
||||||
public static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON;
|
public static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON;
|
||||||
|
public static final String ABFS_CLIENT_TIMER_THREAD_NAME = "abfs-timer-client";
|
||||||
|
|
||||||
private final URL baseUrl;
|
private final URL baseUrl;
|
||||||
private final SharedKeyCredentials sharedKeyCredentials;
|
private final SharedKeyCredentials sharedKeyCredentials;
|
||||||
@ -149,7 +150,7 @@ public abstract class AbfsClient implements Closeable {
|
|||||||
private AccessTokenProvider tokenProvider;
|
private AccessTokenProvider tokenProvider;
|
||||||
private SASTokenProvider sasTokenProvider;
|
private SASTokenProvider sasTokenProvider;
|
||||||
private final AbfsCounters abfsCounters;
|
private final AbfsCounters abfsCounters;
|
||||||
private final Timer timer;
|
private Timer timer;
|
||||||
private final String abfsMetricUrl;
|
private final String abfsMetricUrl;
|
||||||
private boolean isMetricCollectionEnabled = false;
|
private boolean isMetricCollectionEnabled = false;
|
||||||
private final MetricFormat metricFormat;
|
private final MetricFormat metricFormat;
|
||||||
@ -258,9 +259,9 @@ private AbfsClient(final URL baseUrl,
|
|||||||
throw new IOException("Exception while initializing metric credentials " + e);
|
throw new IOException("Exception while initializing metric credentials " + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.timer = new Timer(
|
|
||||||
"abfs-timer-client", true);
|
|
||||||
if (isMetricCollectionEnabled) {
|
if (isMetricCollectionEnabled) {
|
||||||
|
this.timer = new Timer(
|
||||||
|
ABFS_CLIENT_TIMER_THREAD_NAME, true);
|
||||||
timer.schedule(new TimerTaskImpl(),
|
timer.schedule(new TimerTaskImpl(),
|
||||||
metricIdlePeriod,
|
metricIdlePeriod,
|
||||||
metricIdlePeriod);
|
metricIdlePeriod);
|
||||||
@ -292,9 +293,9 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (runningTimerTask != null) {
|
if (isMetricCollectionEnabled && runningTimerTask != null) {
|
||||||
runningTimerTask.cancel();
|
runningTimerTask.cancel();
|
||||||
timer.purge();
|
timer.cancel();
|
||||||
}
|
}
|
||||||
if (keepAliveCache != null) {
|
if (keepAliveCache != null) {
|
||||||
keepAliveCache.close();
|
keepAliveCache.close();
|
||||||
@ -1418,7 +1419,7 @@ private TracingContext getMetricTracingContext() {
|
|||||||
boolean timerOrchestrator(TimerFunctionality timerFunctionality, TimerTask timerTask) {
|
boolean timerOrchestrator(TimerFunctionality timerFunctionality, TimerTask timerTask) {
|
||||||
switch (timerFunctionality) {
|
switch (timerFunctionality) {
|
||||||
case RESUME:
|
case RESUME:
|
||||||
if (isMetricCollectionStopped.get()) {
|
if (isMetricCollectionEnabled && isMetricCollectionStopped.get()) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (isMetricCollectionStopped.get()) {
|
if (isMetricCollectionStopped.get()) {
|
||||||
resumeTimer();
|
resumeTimer();
|
||||||
@ -1597,6 +1598,11 @@ KeepAliveCache getKeepAliveCache() {
|
|||||||
return keepAliveCache;
|
return keepAliveCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected Timer getTimer() {
|
||||||
|
return timer;
|
||||||
|
}
|
||||||
|
|
||||||
protected String getUserAgent() {
|
protected String getUserAgent() {
|
||||||
return userAgent;
|
return userAgent;
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,141 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.Base64;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_KEY;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsClient.ABFS_CLIENT_TIMER_THREAD_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test cases for the AbfsClient class.
|
||||||
|
*/
|
||||||
|
public class TestAbfsClient {
|
||||||
|
private static final String ACCOUNT_NAME = "bogusAccountName.dfs.core.windows.net";
|
||||||
|
private static final String ACCOUNT_KEY = "testKey";
|
||||||
|
private static final long SLEEP_DURATION_MS = 500;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the initialization of the AbfsClient timer when metric collection is disabled.
|
||||||
|
* In case of metric collection being disabled, the timer should not be initialized.
|
||||||
|
* Asserting that the timer is null and the abfs-timer-client thread is not running.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTimerInitializationWithoutMetricCollection() throws Exception {
|
||||||
|
final Configuration configuration = new Configuration();
|
||||||
|
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, ACCOUNT_NAME);
|
||||||
|
|
||||||
|
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
|
||||||
|
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
|
||||||
|
|
||||||
|
// Get an instance of AbfsClient.
|
||||||
|
AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"),
|
||||||
|
null,
|
||||||
|
abfsConfiguration,
|
||||||
|
(AccessTokenProvider) null,
|
||||||
|
null,
|
||||||
|
abfsClientContext);
|
||||||
|
|
||||||
|
Assertions.assertThat(client.getTimer())
|
||||||
|
.describedAs("Timer should not be initialized")
|
||||||
|
.isNull();
|
||||||
|
|
||||||
|
// Check if a thread with the name "abfs-timer-client" exists
|
||||||
|
Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME))
|
||||||
|
.describedAs("Expected thread 'abfs-timer-client' not found")
|
||||||
|
.isEqualTo(false);
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the initialization of the AbfsClient timer when metric collection is enabled.
|
||||||
|
* In case of metric collection being enabled, the timer should be initialized.
|
||||||
|
* Asserting that the timer is not null and the abfs-timer-client thread is running.
|
||||||
|
* Also, asserting that the thread is removed after closing the client.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTimerInitializationWithMetricCollection() throws Exception {
|
||||||
|
final Configuration configuration = new Configuration();
|
||||||
|
configuration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_BACKOFF_METRIC_FORMAT));
|
||||||
|
configuration.set(FS_AZURE_METRIC_ACCOUNT_NAME, ACCOUNT_NAME);
|
||||||
|
configuration.set(FS_AZURE_METRIC_ACCOUNT_KEY, Base64.encode(ACCOUNT_KEY.getBytes()));
|
||||||
|
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, ACCOUNT_NAME);
|
||||||
|
|
||||||
|
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
|
||||||
|
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
|
||||||
|
|
||||||
|
// Get an instance of AbfsClient.
|
||||||
|
AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"),
|
||||||
|
null,
|
||||||
|
abfsConfiguration,
|
||||||
|
(AccessTokenProvider) null,
|
||||||
|
null,
|
||||||
|
abfsClientContext);
|
||||||
|
|
||||||
|
Assertions.assertThat(client.getTimer())
|
||||||
|
.describedAs("Timer should be initialized")
|
||||||
|
.isNotNull();
|
||||||
|
|
||||||
|
// Check if a thread with the name "abfs-timer-client" exists
|
||||||
|
Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME))
|
||||||
|
.describedAs("Expected thread 'abfs-timer-client' not found")
|
||||||
|
.isEqualTo(true);
|
||||||
|
client.close();
|
||||||
|
|
||||||
|
// Check if the thread is removed after closing the client
|
||||||
|
Thread.sleep(SLEEP_DURATION_MS);
|
||||||
|
Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME))
|
||||||
|
.describedAs("Unexpected thread 'abfs-timer-client' found")
|
||||||
|
.isEqualTo(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a thread with the specified name is running.
|
||||||
|
*
|
||||||
|
* @param threadName Name of the thread to check
|
||||||
|
* @return true if the thread is running, false otherwise
|
||||||
|
*/
|
||||||
|
private boolean isThreadRunning(String threadName) {
|
||||||
|
// Get all threads and their stack traces
|
||||||
|
Map<Thread, StackTraceElement[]> allThreads = Thread.getAllStackTraces();
|
||||||
|
|
||||||
|
// Check if any thread has the specified name
|
||||||
|
for (Thread thread : allThreads.keySet()) {
|
||||||
|
if (thread.getName().equals(threadName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user