HDFS-9801. ReconfigurableBase should update the cached configuration. (Arpit Agarwal)
This commit is contained in:
parent
9fdfb546fb
commit
1de1641f17
@ -36,7 +36,7 @@ public interface Reconfigurable extends Configurable {
|
|||||||
* If the property cannot be changed, throw a
|
* If the property cannot be changed, throw a
|
||||||
* {@link ReconfigurationException}.
|
* {@link ReconfigurationException}.
|
||||||
*/
|
*/
|
||||||
public String reconfigureProperty(String property, String newVal)
|
void reconfigureProperty(String property, String newVal)
|
||||||
throws ReconfigurationException;
|
throws ReconfigurationException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -46,12 +46,10 @@ public String reconfigureProperty(String property, String newVal)
|
|||||||
* then changeConf should not throw an exception when changing
|
* then changeConf should not throw an exception when changing
|
||||||
* this property.
|
* this property.
|
||||||
*/
|
*/
|
||||||
public boolean isPropertyReconfigurable(String property);
|
boolean isPropertyReconfigurable(String property);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return all the properties that can be changed at run time.
|
* Return all the properties that can be changed at run time.
|
||||||
*/
|
*/
|
||||||
public Collection<String> getReconfigurableProperties();
|
Collection<String> getReconfigurableProperties();
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -112,14 +112,14 @@ private static class ReconfigurationThread extends Thread {
|
|||||||
// See {@link ReconfigurationServlet#applyChanges}
|
// See {@link ReconfigurationServlet#applyChanges}
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Starting reconfiguration task.");
|
LOG.info("Starting reconfiguration task.");
|
||||||
Configuration oldConf = this.parent.getConf();
|
final Configuration oldConf = parent.getConf();
|
||||||
Configuration newConf = this.parent.getNewConf();
|
final Configuration newConf = parent.getNewConf();
|
||||||
Collection<PropertyChange> changes =
|
final Collection<PropertyChange> changes =
|
||||||
this.parent.getChangedProperties(newConf, oldConf);
|
parent.getChangedProperties(newConf, oldConf);
|
||||||
Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
|
Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
|
||||||
for (PropertyChange change : changes) {
|
for (PropertyChange change : changes) {
|
||||||
String errorMessage = null;
|
String errorMessage = null;
|
||||||
if (!this.parent.isPropertyReconfigurable(change.prop)) {
|
if (!parent.isPropertyReconfigurable(change.prop)) {
|
||||||
LOG.info(String.format(
|
LOG.info(String.format(
|
||||||
"Property %s is not configurable: old value: %s, new value: %s",
|
"Property %s is not configurable: old value: %s, new value: %s",
|
||||||
change.prop, change.oldVal, change.newVal));
|
change.prop, change.oldVal, change.newVal));
|
||||||
@ -130,17 +130,23 @@ public void run() {
|
|||||||
+ "\" to \"" + ((change.newVal == null) ? "<default>" : change.newVal)
|
+ "\" to \"" + ((change.newVal == null) ? "<default>" : change.newVal)
|
||||||
+ "\".");
|
+ "\".");
|
||||||
try {
|
try {
|
||||||
this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
|
String effectiveValue =
|
||||||
|
parent.reconfigurePropertyImpl(change.prop, change.newVal);
|
||||||
|
if (change.newVal != null) {
|
||||||
|
oldConf.set(change.prop, effectiveValue);
|
||||||
|
} else {
|
||||||
|
oldConf.unset(change.prop);
|
||||||
|
}
|
||||||
} catch (ReconfigurationException e) {
|
} catch (ReconfigurationException e) {
|
||||||
errorMessage = e.getCause().getMessage();
|
errorMessage = e.getCause().getMessage();
|
||||||
}
|
}
|
||||||
results.put(change, Optional.fromNullable(errorMessage));
|
results.put(change, Optional.fromNullable(errorMessage));
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (this.parent.reconfigLock) {
|
synchronized (parent.reconfigLock) {
|
||||||
this.parent.endTime = Time.now();
|
parent.endTime = Time.now();
|
||||||
this.parent.status = Collections.unmodifiableMap(results);
|
parent.status = Collections.unmodifiableMap(results);
|
||||||
this.parent.reconfigThread = null;
|
parent.reconfigThread = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -203,21 +209,19 @@ public void shutdownReconfigurationTask() {
|
|||||||
* reconfigureProperty.
|
* reconfigureProperty.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public final String reconfigureProperty(String property, String newVal)
|
public final void reconfigureProperty(String property, String newVal)
|
||||||
throws ReconfigurationException {
|
throws ReconfigurationException {
|
||||||
if (isPropertyReconfigurable(property)) {
|
if (isPropertyReconfigurable(property)) {
|
||||||
LOG.info("changing property " + property + " to " + newVal);
|
LOG.info("changing property " + property + " to " + newVal);
|
||||||
String oldVal;
|
|
||||||
synchronized(getConf()) {
|
synchronized(getConf()) {
|
||||||
oldVal = getConf().get(property);
|
getConf().get(property);
|
||||||
reconfigurePropertyImpl(property, newVal);
|
String effectiveValue = reconfigurePropertyImpl(property, newVal);
|
||||||
if (newVal != null) {
|
if (newVal != null) {
|
||||||
getConf().set(property, newVal);
|
getConf().set(property, effectiveValue);
|
||||||
} else {
|
} else {
|
||||||
getConf().unset(property);
|
getConf().unset(property);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return oldVal;
|
|
||||||
} else {
|
} else {
|
||||||
throw new ReconfigurationException(property, newVal,
|
throw new ReconfigurationException(property, newVal,
|
||||||
getConf().get(property));
|
getConf().get(property));
|
||||||
@ -251,8 +255,15 @@ public boolean isPropertyReconfigurable(String property) {
|
|||||||
* that is being changed. If this object owns other Reconfigurable objects
|
* that is being changed. If this object owns other Reconfigurable objects
|
||||||
* reconfigureProperty should be called recursively to make sure that
|
* reconfigureProperty should be called recursively to make sure that
|
||||||
* to make sure that the configuration of these objects is updated.
|
* to make sure that the configuration of these objects is updated.
|
||||||
|
*
|
||||||
|
* @param property Name of the property that is being reconfigured.
|
||||||
|
* @param newVal Proposed new value of the property.
|
||||||
|
* @return Effective new value of the property. This may be different from
|
||||||
|
* newVal.
|
||||||
|
*
|
||||||
|
* @throws ReconfigurationException if there was an error applying newVal.
|
||||||
*/
|
*/
|
||||||
protected abstract void reconfigurePropertyImpl(String property, String newVal)
|
protected abstract String reconfigurePropertyImpl(
|
||||||
throws ReconfigurationException;
|
String property, String newVal) throws ReconfigurationException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.conf;
|
package org.apache.hadoop.conf;
|
||||||
|
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
@ -27,13 +28,13 @@
|
|||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doNothing;
|
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
@ -44,6 +45,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
public class TestReconfiguration {
|
public class TestReconfiguration {
|
||||||
private Configuration conf1;
|
private Configuration conf1;
|
||||||
@ -129,9 +131,10 @@ public Collection<String> getReconfigurableProperties() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void reconfigurePropertyImpl(
|
public synchronized String reconfigurePropertyImpl(
|
||||||
String property, String newVal) throws ReconfigurationException {
|
String property, String newVal) throws ReconfigurationException {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
return newVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -354,13 +357,14 @@ public Collection<String> getReconfigurableProperties() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void reconfigurePropertyImpl(String property,
|
public synchronized String reconfigurePropertyImpl(String property,
|
||||||
String newVal) throws ReconfigurationException {
|
String newVal) throws ReconfigurationException {
|
||||||
try {
|
try {
|
||||||
latch.await();
|
latch.await();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// Ignore
|
// Ignore
|
||||||
}
|
}
|
||||||
|
return newVal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -395,9 +399,9 @@ public void testAsyncReconfigure()
|
|||||||
doReturn(false).when(dummy).isPropertyReconfigurable(eq("name2"));
|
doReturn(false).when(dummy).isPropertyReconfigurable(eq("name2"));
|
||||||
doReturn(true).when(dummy).isPropertyReconfigurable(eq("name3"));
|
doReturn(true).when(dummy).isPropertyReconfigurable(eq("name3"));
|
||||||
|
|
||||||
doNothing().when(dummy)
|
doReturn("dummy").when(dummy)
|
||||||
.reconfigurePropertyImpl(eq("name1"), anyString());
|
.reconfigurePropertyImpl(eq("name1"), anyString());
|
||||||
doNothing().when(dummy)
|
doReturn("dummy").when(dummy)
|
||||||
.reconfigurePropertyImpl(eq("name2"), anyString());
|
.reconfigurePropertyImpl(eq("name2"), anyString());
|
||||||
doThrow(new ReconfigurationException("NAME3", "NEW3", "OLD3",
|
doThrow(new ReconfigurationException("NAME3", "NEW3", "OLD3",
|
||||||
new IOException("io exception")))
|
new IOException("io exception")))
|
||||||
@ -474,4 +478,131 @@ public void testStartReconfigurationFailureDueToExistingRunningTask()
|
|||||||
GenericTestUtils.assertExceptionContains("The server is stopped", e);
|
GenericTestUtils.assertExceptionContains("The server is stopped", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/**
|
||||||
|
* Ensure that {@link ReconfigurableBase#reconfigureProperty} updates the
|
||||||
|
* parent's cached configuration on success.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testConfIsUpdatedOnSuccess() throws ReconfigurationException {
|
||||||
|
final String property = "FOO";
|
||||||
|
final String value1 = "value1";
|
||||||
|
final String value2 = "value2";
|
||||||
|
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.set(property, value1);
|
||||||
|
final Configuration newConf = new Configuration();
|
||||||
|
newConf.set(property, value2);
|
||||||
|
|
||||||
|
final ReconfigurableBase reconfigurable = makeReconfigurable(
|
||||||
|
conf, newConf, Arrays.asList(property));
|
||||||
|
|
||||||
|
reconfigurable.reconfigureProperty(property, value2);
|
||||||
|
assertThat(reconfigurable.getConf().get(property), is(value2));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure that {@link ReconfigurableBase#startReconfigurationTask} updates
|
||||||
|
* its parent's cached configuration on success.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testConfIsUpdatedOnSuccessAsync() throws ReconfigurationException,
|
||||||
|
TimeoutException, InterruptedException, IOException {
|
||||||
|
final String property = "FOO";
|
||||||
|
final String value1 = "value1";
|
||||||
|
final String value2 = "value2";
|
||||||
|
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.set(property, value1);
|
||||||
|
final Configuration newConf = new Configuration();
|
||||||
|
newConf.set(property, value2);
|
||||||
|
|
||||||
|
final ReconfigurableBase reconfigurable = makeReconfigurable(
|
||||||
|
conf, newConf, Arrays.asList(property));
|
||||||
|
|
||||||
|
// Kick off a reconfiguration task and wait until it completes.
|
||||||
|
reconfigurable.startReconfigurationTask();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return reconfigurable.getReconfigurationTaskStatus().stopped();
|
||||||
|
}
|
||||||
|
}, 100, 60000);
|
||||||
|
assertThat(reconfigurable.getConf().get(property), is(value2));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure that {@link ReconfigurableBase#reconfigureProperty} unsets the
|
||||||
|
* property in its parent's configuration when the new value is null.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testConfIsUnset() throws ReconfigurationException {
|
||||||
|
final String property = "FOO";
|
||||||
|
final String value1 = "value1";
|
||||||
|
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.set(property, value1);
|
||||||
|
final Configuration newConf = new Configuration();
|
||||||
|
|
||||||
|
final ReconfigurableBase reconfigurable = makeReconfigurable(
|
||||||
|
conf, newConf, Arrays.asList(property));
|
||||||
|
|
||||||
|
reconfigurable.reconfigureProperty(property, null);
|
||||||
|
assertNull(reconfigurable.getConf().get(property));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure that {@link ReconfigurableBase#startReconfigurationTask} unsets the
|
||||||
|
* property in its parent's configuration when the new value is null.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testConfIsUnsetAsync() throws ReconfigurationException,
|
||||||
|
IOException, TimeoutException, InterruptedException {
|
||||||
|
final String property = "FOO";
|
||||||
|
final String value1 = "value1";
|
||||||
|
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.set(property, value1);
|
||||||
|
final Configuration newConf = new Configuration();
|
||||||
|
|
||||||
|
final ReconfigurableBase reconfigurable = makeReconfigurable(
|
||||||
|
conf, newConf, Arrays.asList(property));
|
||||||
|
|
||||||
|
// Kick off a reconfiguration task and wait until it completes.
|
||||||
|
reconfigurable.startReconfigurationTask();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return reconfigurable.getReconfigurationTaskStatus().stopped();
|
||||||
|
}
|
||||||
|
}, 100, 60000);
|
||||||
|
assertNull(reconfigurable.getConf().get(property));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReconfigurableBase makeReconfigurable(
|
||||||
|
final Configuration oldConf, final Configuration newConf,
|
||||||
|
final Collection<String> reconfigurableProperties) {
|
||||||
|
|
||||||
|
return new ReconfigurableBase(oldConf) {
|
||||||
|
@Override
|
||||||
|
protected Configuration getNewConf() {
|
||||||
|
return newConf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<String> getReconfigurableProperties() {
|
||||||
|
return reconfigurableProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String reconfigurePropertyImpl(
|
||||||
|
String property, String newVal) throws ReconfigurationException {
|
||||||
|
return newVal;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -2759,6 +2759,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-9790. HDFS Balancer should exit with a proper message if upgrade is
|
HDFS-9790. HDFS Balancer should exit with a proper message if upgrade is
|
||||||
not finalized. (Xiaobing Zhou via Arpit Agarwal)
|
not finalized. (Xiaobing Zhou via Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-9801. ReconfigurableBase should update the cached configuration.
|
||||||
|
(Arpit Agarwal)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -504,70 +504,80 @@ protected Configuration getNewConf() {
|
|||||||
return new HdfsConfiguration();
|
return new HdfsConfiguration();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void reconfigurePropertyImpl(String property, String newVal)
|
public String reconfigurePropertyImpl(String property, String newVal)
|
||||||
throws ReconfigurationException {
|
throws ReconfigurationException {
|
||||||
if (property.equals(DFS_DATANODE_DATA_DIR_KEY)) {
|
switch (property) {
|
||||||
IOException rootException = null;
|
case DFS_DATANODE_DATA_DIR_KEY: {
|
||||||
try {
|
IOException rootException = null;
|
||||||
LOG.info("Reconfiguring " + property + " to " + newVal);
|
|
||||||
this.refreshVolumes(newVal);
|
|
||||||
} catch (IOException e) {
|
|
||||||
rootException = e;
|
|
||||||
} finally {
|
|
||||||
// Send a full block report to let NN acknowledge the volume changes.
|
|
||||||
try {
|
try {
|
||||||
triggerBlockReport(
|
LOG.info("Reconfiguring " + property + " to " + newVal);
|
||||||
new BlockReportOptions.Factory().setIncremental(false).build());
|
this.refreshVolumes(newVal);
|
||||||
|
return conf.get(DFS_DATANODE_DATA_DIR_KEY);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Exception while sending the block report after refreshing"
|
rootException = e;
|
||||||
+ " volumes " + property + " to " + newVal, e);
|
} finally {
|
||||||
if (rootException == null) {
|
// Send a full block report to let NN acknowledge the volume changes.
|
||||||
rootException = e;
|
try {
|
||||||
|
triggerBlockReport(
|
||||||
|
new BlockReportOptions.Factory().setIncremental(false).build());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Exception while sending the block report after refreshing"
|
||||||
|
+ " volumes " + property + " to " + newVal, e);
|
||||||
|
if (rootException == null) {
|
||||||
|
rootException = e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (rootException != null) {
|
||||||
|
throw new ReconfigurationException(property, newVal,
|
||||||
|
getConf().get(property), rootException);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: {
|
||||||
|
ReconfigurationException rootException = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Reconfiguring " + property + " to " + newVal);
|
||||||
|
int movers;
|
||||||
|
if (newVal == null) {
|
||||||
|
// set to default
|
||||||
|
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
|
} else {
|
||||||
|
movers = Integer.parseInt(newVal);
|
||||||
|
if (movers <= 0) {
|
||||||
|
rootException = new ReconfigurationException(
|
||||||
|
property,
|
||||||
|
newVal,
|
||||||
|
getConf().get(property),
|
||||||
|
new IllegalArgumentException(
|
||||||
|
"balancer max concurrent movers must be larger than 0"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
xserver.updateBalancerMaxConcurrentMovers(movers);
|
||||||
|
return Integer.toString(movers);
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
rootException = new ReconfigurationException(
|
||||||
|
property, newVal, getConf().get(property), nfe);
|
||||||
} finally {
|
} finally {
|
||||||
if (rootException != null) {
|
if (rootException != null) {
|
||||||
throw new ReconfigurationException(property, newVal,
|
LOG.warn(String.format(
|
||||||
getConf().get(property), rootException);
|
"Exception in updating balancer max concurrent movers %s to %s",
|
||||||
|
property, newVal), rootException);
|
||||||
|
throw rootException;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else if (property.equals(
|
default:
|
||||||
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)) {
|
break;
|
||||||
ReconfigurationException rootException = null;
|
|
||||||
try {
|
|
||||||
LOG.info("Reconfiguring " + property + " to " + newVal);
|
|
||||||
int movers;
|
|
||||||
if (newVal == null) {
|
|
||||||
// set to default
|
|
||||||
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
|
||||||
} else {
|
|
||||||
movers = Integer.parseInt(newVal);
|
|
||||||
if (movers <= 0) {
|
|
||||||
rootException = new ReconfigurationException(
|
|
||||||
property,
|
|
||||||
newVal,
|
|
||||||
getConf().get(property),
|
|
||||||
new IllegalArgumentException(
|
|
||||||
"balancer max concurrent movers must be larger than 0"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
xserver.updateBalancerMaxConcurrentMovers(movers);
|
|
||||||
} catch(NumberFormatException nfe) {
|
|
||||||
rootException = new ReconfigurationException(
|
|
||||||
property, newVal, getConf().get(property), nfe);
|
|
||||||
} finally {
|
|
||||||
if (rootException != null) {
|
|
||||||
LOG.warn(String.format(
|
|
||||||
"Exception in updating balancer max concurrent movers %s to %s",
|
|
||||||
property, newVal), rootException);
|
|
||||||
throw rootException;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new ReconfigurationException(
|
|
||||||
property, newVal, getConf().get(property));
|
|
||||||
}
|
}
|
||||||
|
throw new ReconfigurationException(
|
||||||
|
property, newVal, getConf().get(property));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -285,7 +285,10 @@ private void addVolumes(int numNewVolumes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
String newDataDir = newDataDirBuf.toString();
|
String newDataDir = newDataDirBuf.toString();
|
||||||
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir);
|
assertThat(
|
||||||
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir),
|
||||||
|
is(conf.get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
|
||||||
// Verify the configuration value is appropriately set.
|
// Verify the configuration value is appropriately set.
|
||||||
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
|
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
|
||||||
@ -448,8 +451,11 @@ public void testRemoveOneVolume()
|
|||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
Collection<String> oldDirs = getDataDirs(dn);
|
Collection<String> oldDirs = getDataDirs(dn);
|
||||||
String newDirs = oldDirs.iterator().next(); // Keep the first volume.
|
String newDirs = oldDirs.iterator().next(); // Keep the first volume.
|
||||||
dn.reconfigurePropertyImpl(
|
assertThat(
|
||||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
assertFileLocksReleased(
|
assertFileLocksReleased(
|
||||||
new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
|
new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
|
||||||
dn.scheduleAllBlockReport(0);
|
dn.scheduleAllBlockReport(0);
|
||||||
@ -505,8 +511,11 @@ public void testReplicatingAfterRemoveVolume()
|
|||||||
newDirs = dir;
|
newDirs = dir;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
dn.reconfigurePropertyImpl(
|
assertThat(
|
||||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
oldDirs.remove(newDirs);
|
oldDirs.remove(newDirs);
|
||||||
assertFileLocksReleased(oldDirs);
|
assertFileLocksReleased(oldDirs);
|
||||||
|
|
||||||
@ -652,8 +661,10 @@ public Object answer(InvocationOnMock invocation)
|
|||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
barrier.await();
|
barrier.await();
|
||||||
dn.reconfigurePropertyImpl(
|
assertThat(
|
||||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
} catch (ReconfigurationException |
|
} catch (ReconfigurationException |
|
||||||
InterruptedException |
|
InterruptedException |
|
||||||
BrokenBarrierException e) {
|
BrokenBarrierException e) {
|
||||||
@ -701,7 +712,10 @@ public void testAddBackRemovedVolume()
|
|||||||
String keepDataDir = oldDataDir.split(",")[0];
|
String keepDataDir = oldDataDir.split(",")[0];
|
||||||
String removeDataDir = oldDataDir.split(",")[1];
|
String removeDataDir = oldDataDir.split(",")[1];
|
||||||
|
|
||||||
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, keepDataDir);
|
assertThat(
|
||||||
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, keepDataDir),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
|
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
|
||||||
String bpid = cluster.getNamesystem(i).getBlockPoolId();
|
String bpid = cluster.getNamesystem(i).getBlockPoolId();
|
||||||
BlockPoolSliceStorage bpsStorage =
|
BlockPoolSliceStorage bpsStorage =
|
||||||
@ -718,7 +732,10 @@ public void testAddBackRemovedVolume()
|
|||||||
|
|
||||||
// Bring the removed directory back. It only successes if all metadata about
|
// Bring the removed directory back. It only successes if all metadata about
|
||||||
// this directory were removed from the previous step.
|
// this directory were removed from the previous step.
|
||||||
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
|
assertThat(
|
||||||
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get the FsVolume on the given basePath */
|
/** Get the FsVolume on the given basePath */
|
||||||
@ -772,7 +789,10 @@ public void testDirectlyReloadAfterCheckDiskError()
|
|||||||
assertEquals(used, failedVolume.getDfsUsed());
|
assertEquals(used, failedVolume.getDfsUsed());
|
||||||
|
|
||||||
DataNodeTestUtils.restoreDataDirFromFailure(dirToFail);
|
DataNodeTestUtils.restoreDataDirFromFailure(dirToFail);
|
||||||
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
|
assertThat(
|
||||||
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
|
||||||
createFile(new Path("/test2"), 32, (short)2);
|
createFile(new Path("/test2"), 32, (short)2);
|
||||||
FsVolumeImpl restoredVolume = getVolume(dn, dirToFail);
|
FsVolumeImpl restoredVolume = getVolume(dn, dirToFail);
|
||||||
@ -806,7 +826,11 @@ public void testFullBlockReportAfterRemovingVolumes()
|
|||||||
|
|
||||||
// Remove a data dir from datanode
|
// Remove a data dir from datanode
|
||||||
File dataDirToKeep = new File(cluster.getDataDirectory(), "data1");
|
File dataDirToKeep = new File(cluster.getDataDirectory(), "data1");
|
||||||
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, dataDirToKeep.toString());
|
assertThat(
|
||||||
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(
|
||||||
|
DFS_DATANODE_DATA_DIR_KEY, dataDirToKeep.toString()),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
|
||||||
// We should get 1 full report
|
// We should get 1 full report
|
||||||
Mockito.verify(spy, timeout(60000).times(1)).blockReport(
|
Mockito.verify(spy, timeout(60000).times(1)).blockReport(
|
||||||
|
@ -17,10 +17,12 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
@ -324,13 +326,17 @@ public void testVolumeFailureRecoveredByHotSwappingVolume()
|
|||||||
|
|
||||||
// Hot swap out the failure volume.
|
// Hot swap out the failure volume.
|
||||||
String dataDirs = dn0Vol2.getPath();
|
String dataDirs = dn0Vol2.getPath();
|
||||||
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
assertThat(
|
||||||
dataDirs);
|
dn0.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirs),
|
||||||
|
is(dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
|
||||||
// Fix failure volume dn0Vol1 and remount it back.
|
// Fix failure volume dn0Vol1 and remount it back.
|
||||||
DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1);
|
DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1);
|
||||||
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
assertThat(
|
||||||
oldDataDirs);
|
dn0.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, oldDataDirs),
|
||||||
|
is(dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
|
||||||
// Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient
|
// Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient
|
||||||
// resources, thus it should keep running.
|
// resources, thus it should keep running.
|
||||||
@ -354,8 +360,11 @@ public void testTolerateVolumeFailuresAfterAddingMoreVolumes()
|
|||||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
|
||||||
|
|
||||||
// Add a new volume to DN0
|
// Add a new volume to DN0
|
||||||
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
assertThat(
|
||||||
oldDataDirs + "," + dn0VolNew.getAbsolutePath());
|
dn0.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||||
|
oldDataDirs + "," + dn0VolNew.getAbsolutePath()),
|
||||||
|
is(dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
|
||||||
// Fail dn0Vol1 first and hot swap it.
|
// Fail dn0Vol1 first and hot swap it.
|
||||||
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
|
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
|
||||||
|
@ -19,9 +19,11 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
@ -591,8 +593,11 @@ private static void reconfigureDataNode(DataNode dn, File... newVols)
|
|||||||
dnNewDataDirs.append(newVol.getAbsolutePath());
|
dnNewDataDirs.append(newVol.getAbsolutePath());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
dn.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
assertThat(
|
||||||
dnNewDataDirs.toString());
|
dn.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||||
|
dnNewDataDirs.toString()),
|
||||||
|
is(dn.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
} catch (ReconfigurationException e) {
|
} catch (ReconfigurationException e) {
|
||||||
// This can be thrown if reconfiguration tries to use a failed volume.
|
// This can be thrown if reconfiguration tries to use a failed volume.
|
||||||
// We need to swallow the exception, because some of our tests want to
|
// We need to swallow the exception, because some of our tests want to
|
||||||
|
Loading…
Reference in New Issue
Block a user