HDDS-776. Make OM initialization resilient to dns failures. Contributed by Doroszlai, Attila.
This commit is contained in:
parent
ba9efe06fa
commit
9f2da01591
@ -123,6 +123,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<artifactId>commons-validator</artifactId>
|
||||
<version>1.6</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.utils;
|
||||
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.util.ThreadUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* {@code Callable} implementation that retries a delegate task according to
|
||||
* the specified {@code RetryPolicy}. Sleeps between retries in the caller
|
||||
* thread.
|
||||
*
|
||||
* @param <V> the result type of method {@code call}
|
||||
*/
|
||||
public class RetriableTask<V> implements Callable<V> {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RetriableTask.class);
|
||||
|
||||
private final String name;
|
||||
private final Callable<V> task;
|
||||
private final RetryPolicy retryPolicy;
|
||||
|
||||
public RetriableTask(RetryPolicy retryPolicy, String name, Callable<V> task) {
|
||||
this.retryPolicy = retryPolicy;
|
||||
this.name = name;
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V call() throws Exception {
|
||||
int attempts = 0;
|
||||
Exception cause;
|
||||
while (true) {
|
||||
try {
|
||||
return task.call();
|
||||
} catch (Exception e) {
|
||||
cause = e;
|
||||
RetryPolicy.RetryAction action = retryPolicy.shouldRetry(e, ++attempts,
|
||||
0, true);
|
||||
if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
|
||||
LOG.info("Execution of task {} failed, will be retried in {} ms",
|
||||
name, action.delayMillis);
|
||||
ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String msg = String.format(
|
||||
"Execution of task %s failed permanently after %d attempts",
|
||||
name, attempts);
|
||||
LOG.warn(msg, cause);
|
||||
throw new IOException(msg, cause);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.utils;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.zip.ZipException;
|
||||
|
||||
/**
|
||||
* Tests for {@link RetriableTask}.
|
||||
*/
|
||||
public class TestRetriableTask {
|
||||
|
||||
@Test
|
||||
public void returnsSuccessfulResult() throws Exception {
|
||||
String result = "bilbo";
|
||||
RetriableTask<String> task = new RetriableTask<>(
|
||||
RetryPolicies.RETRY_FOREVER, "test", () -> result);
|
||||
assertEquals(result, task.call());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void returnsSuccessfulResultAfterFailures() throws Exception {
|
||||
String result = "gandalf";
|
||||
AtomicInteger attempts = new AtomicInteger();
|
||||
RetriableTask<String> task = new RetriableTask<>(
|
||||
RetryPolicies.RETRY_FOREVER, "test",
|
||||
() -> {
|
||||
if (attempts.incrementAndGet() <= 2) {
|
||||
throw new Exception("testing");
|
||||
}
|
||||
return result;
|
||||
});
|
||||
assertEquals(result, task.call());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void respectsRetryPolicy() {
|
||||
int expectedAttempts = 3;
|
||||
AtomicInteger attempts = new AtomicInteger();
|
||||
RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
||||
expectedAttempts, 1, TimeUnit.MILLISECONDS);
|
||||
RetriableTask<String> task = new RetriableTask<>(retryPolicy, "thr", () -> {
|
||||
attempts.incrementAndGet();
|
||||
throw new ZipException("testing");
|
||||
});
|
||||
|
||||
IOException e = assertThrows(IOException.class, task::call);
|
||||
assertEquals(ZipException.class, e.getCause().getClass());
|
||||
assertEquals(expectedAttempts, attempts.get());
|
||||
}
|
||||
|
||||
}
|
@ -55,6 +55,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<maven-surefire-plugin.version>3.0.0-M1</maven-surefire-plugin.version>
|
||||
|
||||
<junit.jupiter.version>5.3.1</junit.jupiter.version>
|
||||
</properties>
|
||||
<repositories>
|
||||
<repository>
|
||||
@ -182,7 +183,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<version>${bouncycastle.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>${junit.jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<dependencies>
|
||||
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
|
||||
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
@ -112,6 +113,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.utils.RetriableTask;
|
||||
import org.apache.ratis.util.LifeCycle;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -140,6 +142,7 @@ import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
|
||||
import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
|
||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
|
||||
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
|
||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
|
||||
import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
|
||||
@ -695,8 +698,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
||||
StorageState state = omStorage.getState();
|
||||
if (state != StorageState.INITIALIZED) {
|
||||
try {
|
||||
ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
|
||||
ScmInfo scmInfo = scmBlockClient.getScmInfo();
|
||||
ScmInfo scmInfo = getScmInfo(conf);
|
||||
String clusterId = scmInfo.getClusterId();
|
||||
String scmId = scmInfo.getScmId();
|
||||
if (clusterId == null || clusterId.isEmpty()) {
|
||||
@ -726,6 +728,22 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
||||
}
|
||||
}
|
||||
|
||||
private static ScmInfo getScmInfo(OzoneConfiguration conf)
|
||||
throws IOException {
|
||||
try {
|
||||
RetryPolicy retryPolicy = retryUpToMaximumCountWithFixedSleep(
|
||||
10, 5, TimeUnit.SECONDS);
|
||||
RetriableTask<ScmInfo> retriable = new RetriableTask<>(
|
||||
retryPolicy, "OM#getScmInfo",
|
||||
() -> getScmBlockClient(conf).getScmInfo());
|
||||
return retriable.call();
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed to get SCM info", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the command line options for OM initialization.
|
||||
*
|
||||
|
Loading…
x
Reference in New Issue
Block a user