From dde0ab55aadcf7c9cf71dbe36d90e97da6bc9498 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Fri, 15 Feb 2019 16:32:27 -0800 Subject: [PATCH] HDFS-14258. Introduce Java Concurrent Package To DataXceiverServer Class. Contributed by BELUGA BEHR. --- .../hadoop/hdfs/server/datanode/DataNode.java | 10 +- .../server/datanode/DataXceiverServer.java | 403 ++++++++++++------ .../datanode/TestDataNodeReconfiguration.java | 124 ++++-- 3 files changed, 372 insertions(+), 165 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index e926b6a498..2c1a8cd333 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -581,7 +581,15 @@ public String reconfigurePropertyImpl(String property, String newVal) "balancer max concurrent movers must be larger than 0")); } } - xserver.updateBalancerMaxConcurrentMovers(movers); + boolean success = xserver.updateBalancerMaxConcurrentMovers(movers); + if (!success) { + rootException = new ReconfigurationException( + property, + newVal, + getConf().get(property), + new IllegalArgumentException( + "Could not modify concurrent moves thread count")); + } return Integer.toString(movers); } catch (NumberFormatException nfe) { rootException = new ReconfigurationException( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 4aab3f86bb..8faae63e37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -21,35 +21,48 @@ import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; import java.util.HashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.PeerServer; import org.apache.hadoop.hdfs.util.DataTransferThrottler; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Daemon; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.slf4j.Logger; /** - * Server used for receiving/sending a block of data. - * This is created to listen for requests from clients or - * other DataNodes. This small server does not use the - * Hadoop IPC mechanism. + * Server used for receiving/sending a block of data. This is created to listen + * for requests from clients or other DataNodes. This small server does not use + * the Hadoop IPC mechanism. */ class DataXceiverServer implements Runnable { public static final Logger LOG = DataNode.LOG; - + + /** + * Default time to wait (in seconds) for the number of running threads to drop + * below the newly requested maximum before giving up. + */ + private static final int DEFAULT_RECONFIGURE_WAIT = 30; + private final PeerServer peerServer; private final DataNode datanode; - private final HashMap peers = new HashMap(); - private final HashMap peersXceiver = new HashMap(); + private final HashMap peers = new HashMap<>(); + private final HashMap peersXceiver = new HashMap<>(); + private final Lock lock = new ReentrantLock(); + private final Condition noPeers = lock.newCondition(); private boolean closed = false; - + private int maxReconfigureWaitTime = DEFAULT_RECONFIGURE_WAIT; + /** * Maximal number of concurrent xceivers per node. * Enforcing the limit is required in order to avoid data-node @@ -58,77 +71,123 @@ class DataXceiverServer implements Runnable { int maxXceiverCount = DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT; - /** A manager to make sure that cluster balancing does not - * take too much resources. - * - * It limits the number of block moves for balancing and - * the total amount of bandwidth they can use. + /** + * A manager to make sure that cluster balancing does not take too much + * resources. + * + * It limits the number of block moves for balancing and the total amount of + * bandwidth they can use. */ static class BlockBalanceThrottler extends DataTransferThrottler { - private int numThreads; - private final AtomicInteger maxThreads = new AtomicInteger(0); + private final Semaphore semaphore; + private int maxThreads; - /**Constructor - * - * @param bandwidth Total amount of bandwidth can be used for balancing + /** + * Constructor. + * + * @param bandwidth Total amount of bandwidth can be used for balancing */ private BlockBalanceThrottler(long bandwidth, int maxThreads) { super(bandwidth); - this.maxThreads.set(maxThreads); + this.semaphore = new Semaphore(maxThreads, true); + this.maxThreads = maxThreads; LOG.info("Balancing bandwidth is " + bandwidth + " bytes/s"); LOG.info("Number threads for balancing is " + maxThreads); } - private void setMaxConcurrentMovers(int movers) { - this.maxThreads.set(movers); + /** + * Update the number of threads which may be used concurrently for moving + * blocks. The number of threads available can be scaled up or down. If + * increasing the number of threads, the request will be serviced + * immediately. However, if decreasing the number of threads, this method + * will block any new request for moves, wait for any existing backlog of + * move requests to clear, and wait for enough threads to have finished such + * that the total number of threads actively running is less than or equal + * to the new cap. If this method has been unable to successfully set the + * new, lower, cap within 'duration' seconds, the attempt will be aborted + * and the original cap will remain. + * + * @param newMaxThreads The new maximum number of threads for block moving + * @param duration The number of seconds to wait if decreasing threads + * @return true if new maximum was successfully applied; false otherwise + */ + private boolean setMaxConcurrentMovers(final int newMaxThreads, + final int duration) { + Preconditions.checkArgument(newMaxThreads > 0); + final int delta = newMaxThreads - this.maxThreads; + LOG.debug("Change concurrent thread count to {} from {}", newMaxThreads, + this.maxThreads); + if (delta == 0) { + return true; + } + if (delta > 0) { + LOG.debug("Adding thread capacity: {}", delta); + this.semaphore.release(delta); + this.maxThreads = newMaxThreads; + return true; + } + try { + LOG.debug("Removing thread capacity: {}. Max wait: {}", delta, + duration); + boolean acquired = this.semaphore.tryAcquire(Math.abs(delta), duration, + TimeUnit.SECONDS); + if (acquired) { + this.maxThreads = newMaxThreads; + } else { + LOG.warn("Could not lower thread count to {} from {}. Too busy.", + newMaxThreads, this.maxThreads); + } + return acquired; + } catch (InterruptedException e) { + LOG.warn("Interrupted before adjusting thread count: {}", delta); + return false; + } } @VisibleForTesting int getMaxConcurrentMovers() { - return this.maxThreads.get(); + return this.maxThreads; } - /** Check if the block move can start. - * - * Return true if the thread quota is not exceeded and + /** + * Check if the block move can start + * + * Return true if the thread quota is not exceeded and * the counter is incremented; False otherwise. */ - synchronized boolean acquire() { - if (numThreads >= maxThreads.get()) { - return false; - } - numThreads++; - return true; + boolean acquire() { + return this.semaphore.tryAcquire(); } - /** Mark that the move is completed. The thread counter is decremented. */ - synchronized void release() { - numThreads--; + /** + * Mark that the move is completed. The thread counter is decremented. + */ + void release() { + this.semaphore.release(); } } final BlockBalanceThrottler balanceThrottler; - + /** - * We need an estimate for block size to check if the disk partition has - * enough space. Newer clients pass the expected block size to the DataNode. - * For older clients we just use the server-side default block size. + * Stores an estimate for block size to check if the disk partition has enough + * space. Newer clients pass the expected block size to the DataNode. For + * older clients, just use the server-side default block size. */ final long estimateBlockSize; - - + DataXceiverServer(PeerServer peerServer, Configuration conf, DataNode datanode) { this.peerServer = peerServer; this.datanode = datanode; - - this.maxXceiverCount = + + this.maxXceiverCount = conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT); - + this.estimateBlockSize = conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); - + //set up parameter for cluster balancing this.balanceThrottler = new BlockBalanceThrottler( conf.getLongBytes(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, @@ -161,25 +220,25 @@ public void run() { // another thread closed our listener socket - that's expected during shutdown, // but not in other circumstances if (datanode.shouldRun && !datanode.shutdownForUpgrade) { - LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); + LOG.warn("{}:DataXceiverServer", datanode.getDisplayName(), ace); } } catch (IOException ie) { - IOUtils.cleanup(null, peer); - LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie); + IOUtils.closeQuietly(peer); + LOG.warn("{}:DataXceiverServer", datanode.getDisplayName(), ie); } catch (OutOfMemoryError ie) { - IOUtils.cleanup(null, peer); + IOUtils.closeQuietly(peer); // DataNode can run out of memory if there is too many transfers. // Log the event, Sleep for 30 seconds, other transfers may complete by // then. LOG.error("DataNode is out of memory. Will retry in 30 seconds.", ie); try { - Thread.sleep(30 * 1000); + Thread.sleep(TimeUnit.SECONDS.toMillis(30L)); } catch (InterruptedException e) { // ignore } } catch (Throwable te) { - LOG.error(datanode.getDisplayName() - + ":DataXceiverServer: Exiting due to: ", te); + LOG.error("{}:DataXceiverServer: Exiting.", datanode.getDisplayName(), + te); datanode.shouldRun = false; } } @@ -189,8 +248,8 @@ public void run() { peerServer.close(); closed = true; } catch (IOException ie) { - LOG.warn(datanode.getDisplayName() - + " :DataXceiverServer: close exception", ie); + LOG.warn("{}:DataXceiverServer: close exception", + datanode.getDisplayName(), ie); } // if in restart prep stage, notify peers before closing them. @@ -200,16 +259,10 @@ public void run() { // to send an OOB message to the client, but blocked on network for // long time, we need to force its termination. LOG.info("Shutting down DataXceiverServer before restart"); - // Allow roughly up to 2 seconds. - for (int i = 0; getNumPeers() > 0 && i < 10; i++) { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - // ignore - } - } + + waitAllPeers(2L, TimeUnit.SECONDS); } - // Close all peers. + closeAllPeers(); } @@ -221,81 +274,158 @@ void kill() { this.peerServer.close(); this.closed = true; } catch (IOException ie) { - LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie); + LOG.warn("{}:DataXceiverServer.kill()", datanode.getDisplayName(), ie); } } - - synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver) - throws IOException { - if (closed) { - throw new IOException("Server closed."); - } - peers.put(peer, t); - peersXceiver.put(peer, xceiver); - datanode.metrics.incrDataNodeActiveXceiversCount(); - } - synchronized void closePeer(Peer peer) { - peers.remove(peer); - peersXceiver.remove(peer); - datanode.metrics.decrDataNodeActiveXceiversCount(); - IOUtils.cleanup(null, peer); + void addPeer(Peer peer, Thread t, DataXceiver xceiver) + throws IOException { + lock.lock(); + try { + if (closed) { + throw new IOException("Server closed."); + } + peers.put(peer, t); + peersXceiver.put(peer, xceiver); + datanode.metrics.incrDataNodeActiveXceiversCount(); + } finally { + lock.unlock(); + } + } + + void closePeer(Peer peer) { + lock.lock(); + try { + peers.remove(peer); + peersXceiver.remove(peer); + datanode.metrics.decrDataNodeActiveXceiversCount(); + IOUtils.closeQuietly(peer); + if (peers.isEmpty()) { + this.noPeers.signalAll(); + } + } finally { + lock.unlock(); + } } // Sending OOB to all peers - public synchronized void sendOOBToPeers() { - if (!datanode.shutdownForUpgrade) { - return; - } - - for (Peer p : peers.keySet()) { - try { - peersXceiver.get(p).sendOOB(); - } catch (IOException e) { - LOG.warn("Got error when sending OOB message.", e); - } catch (InterruptedException e) { - LOG.warn("Interrupted when sending OOB message."); + public void sendOOBToPeers() { + lock.lock(); + try { + if (!datanode.shutdownForUpgrade) { + return; } + for (Peer p : peers.keySet()) { + try { + peersXceiver.get(p).sendOOB(); + } catch (IOException e) { + LOG.warn("Got error when sending OOB message.", e); + } catch (InterruptedException e) { + LOG.warn("Interrupted when sending OOB message."); + } + } + } finally { + lock.unlock(); } } - public synchronized void stopWriters() { - for (Peer p : peers.keySet()) { - peersXceiver.get(p).stopWriter(); + public void stopWriters() { + lock.lock(); + try { + peers.keySet().forEach(p -> peersXceiver.get(p).stopWriter()); + } finally { + lock.unlock(); } } - - // Notify all peers of the shutdown and restart. - // datanode.shouldRun should still be true and datanode.restarting should - // be set true before calling this method. - synchronized void restartNotifyPeers() { - assert (datanode.shouldRun == true && datanode.shutdownForUpgrade); - for (Thread t : peers.values()) { + + /** + * Notify all peers of the shutdown and restart. 'datanode.shouldRun' should + * still be true and 'datanode.restarting' should be set true before calling + * this method. + */ + void restartNotifyPeers() { + assert (datanode.shouldRun && datanode.shutdownForUpgrade); + lock.lock(); + try { // interrupt each and every DataXceiver thread. - t.interrupt(); + peers.values().forEach(t -> t.interrupt()); + } finally { + lock.unlock(); } } - // Close all peers and clear the map. - synchronized void closeAllPeers() { + /** + * Close all peers and clear the map. + */ + void closeAllPeers() { LOG.info("Closing all peers."); - for (Peer p : peers.keySet()) { - IOUtils.cleanup(null, p); + lock.lock(); + try { + peers.keySet().forEach(p -> IOUtils.closeQuietly(p)); + peers.clear(); + peersXceiver.clear(); + datanode.metrics.setDataNodeActiveXceiversCount(0); + this.noPeers.signalAll(); + } finally { + lock.unlock(); } - peers.clear(); - peersXceiver.clear(); - datanode.metrics.setDataNodeActiveXceiversCount(0); } - // Return the number of peers. - synchronized int getNumPeers() { - return peers.size(); + /** + * Causes a thread to block until all peers are removed, a certain amount of + * time has passed, or the thread is interrupted. + * + * @param timeout the maximum time to wait, in nanoseconds + * @param unit the unit of time to wait + * @return true if thread returned because all peers were removed; false + * otherwise + */ + private boolean waitAllPeers(long timeout, TimeUnit unit) { + long nanos = unit.toNanos(timeout); + lock.lock(); + try { + while (!peers.isEmpty()) { + if (nanos <= 0L) { + return false; + } + nanos = noPeers.awaitNanos(nanos); + } + } catch (InterruptedException e) { + LOG.debug("Interrupted waiting for peers to close"); + return false; + } finally { + lock.unlock(); + } + return true; } - // Return the number of peers and DataXceivers. + /** + * Return the number of peers. + * + * @return the number of active peers + */ + int getNumPeers() { + lock.lock(); + try { + return peers.size(); + } finally { + lock.unlock(); + } + } + + /** + * Return the number of peers and DataXceivers. + * + * @return the number of peers and DataXceivers. + */ @VisibleForTesting - synchronized int getNumPeersXceiver() { - return peersXceiver.size(); + int getNumPeersXceiver() { + lock.lock(); + try { + return peersXceiver.size(); + } finally { + lock.unlock(); + } } @VisibleForTesting @@ -303,13 +433,42 @@ PeerServer getPeerServer() { return peerServer; } - synchronized void releasePeer(Peer peer) { - peers.remove(peer); - peersXceiver.remove(peer); - datanode.metrics.decrDataNodeActiveXceiversCount(); + /** + * Release a peer. + * + * @param peer The peer to release + */ + void releasePeer(Peer peer) { + lock.lock(); + try { + peers.remove(peer); + peersXceiver.remove(peer); + datanode.metrics.decrDataNodeActiveXceiversCount(); + } finally { + lock.unlock(); + } } - public void updateBalancerMaxConcurrentMovers(int movers) { - balanceThrottler.setMaxConcurrentMovers(movers); + /** + * Update the number of threads which may be used concurrently for moving + * blocks. + * + * @param movers The new maximum number of threads for block moving + * @return true if new maximum was successfully applied; false otherwise + */ + public boolean updateBalancerMaxConcurrentMovers(final int movers) { + return balanceThrottler.setMaxConcurrentMovers(movers, + this.maxReconfigureWaitTime); + } + + /** + * Update the maximum amount of time to wait for reconfiguration of the + * maximum number of block mover threads to complete. + * + * @param max The new maximum number of threads for block moving, in seconds + */ + @VisibleForTesting + void setMaxReconfigureWaitTime(int max) { + this.maxReconfigureWaitTime = max; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index 4e6f5699ac..ff3b3eabc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -176,55 +176,96 @@ public void testMaxConcurrentMoversReconfiguration() @Test public void testAcquireWithMaxConcurrentMoversGreaterThanDefault() throws IOException, ReconfigurationException { - testAcquireWithMaxConcurrentMoversShared(10); + final DataNode[] dns = createDNsForTest(1); + try { + testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], 10); + } finally { + dns[0].shutdown(); + } } @Test public void testAcquireWithMaxConcurrentMoversLessThanDefault() throws IOException, ReconfigurationException { - testAcquireWithMaxConcurrentMoversShared(3); - } - - private void testAcquireWithMaxConcurrentMoversShared( - int maxConcurrentMovers) - throws IOException, ReconfigurationException { - DataNode[] dns = null; + final DataNode[] dns = createDNsForTest(1); try { - dns = createDNsForTest(1); - testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], - maxConcurrentMovers); - } catch (IOException ioe) { - throw ioe; - } catch (ReconfigurationException re) { - throw re; + testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], 3); } finally { - shutDownDNs(dns); + dns[0].shutdown(); } } - private void shutDownDNs(DataNode[] dns) { - if (dns == null) { - return; - } + /** + * Simulates a scenario where the DataNode has been reconfigured with fewer + * mover threads, but all of the current treads are busy and therefore the + * DataNode is unable to honor this request within a reasonable amount of + * time. The DataNode eventually gives up and returns a flag indicating that + * the request was not honored. + */ + @Test + public void testFailedDecreaseConcurrentMovers() + throws IOException, ReconfigurationException { + final DataNode[] dns = createDNsForTest(1); + final DataNode dataNode = dns[0]; + try { + // Set the current max to 2 + dataNode.xserver.updateBalancerMaxConcurrentMovers(2); - for (int i = 0; i < dns.length; i++) { - try { - if (dns[i] == null) { - continue; - } - dns[i].shutdown(); - } catch (Exception e) { - LOG.error("Cannot close: ", e); - } + // Simulate grabbing 2 threads + dataNode.xserver.balanceThrottler.acquire(); + dataNode.xserver.balanceThrottler.acquire(); + + dataNode.xserver.setMaxReconfigureWaitTime(1); + + // Attempt to set new maximum to 1 + final boolean success = + dataNode.xserver.updateBalancerMaxConcurrentMovers(1); + Assert.assertFalse(success); + } finally { + dataNode.shutdown(); + } + } + + /** + * Test with invalid configuration. + */ + @Test(expected = ReconfigurationException.class) + public void testFailedDecreaseConcurrentMoversReconfiguration() + throws IOException, ReconfigurationException { + final DataNode[] dns = createDNsForTest(1); + final DataNode dataNode = dns[0]; + try { + // Set the current max to 2 + dataNode.xserver.updateBalancerMaxConcurrentMovers(2); + + // Simulate grabbing 2 threads + dataNode.xserver.balanceThrottler.acquire(); + dataNode.xserver.balanceThrottler.acquire(); + + dataNode.xserver.setMaxReconfigureWaitTime(1); + + // Now try reconfigure maximum downwards with threads released + dataNode.reconfigurePropertyImpl( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, "1"); + } catch (ReconfigurationException e) { + Assert.assertEquals(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + e.getProperty()); + Assert.assertEquals("1", e.getNewValue()); + throw e; + } finally { + dataNode.shutdown(); } } private void testAcquireOnMaxConcurrentMoversReconfiguration( DataNode dataNode, int maxConcurrentMovers) throws IOException, ReconfigurationException { - int defaultMaxThreads = dataNode.getConf().getInt( + final int defaultMaxThreads = dataNode.getConf().getInt( DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + + /** Test that the default setup is working */ + for (int i = 0; i < defaultMaxThreads; i++) { assertEquals("should be able to get thread quota", true, dataNode.xserver.balanceThrottler.acquire()); @@ -233,25 +274,24 @@ private void testAcquireOnMaxConcurrentMoversReconfiguration( assertEquals("should not be able to get thread quota", false, dataNode.xserver.balanceThrottler.acquire()); + // Give back the threads + for (int i = 0; i < defaultMaxThreads; i++) { + dataNode.xserver.balanceThrottler.release(); + } + + /** Test that the change is applied correctly */ + // change properties dataNode.reconfigureProperty( DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, String.valueOf(maxConcurrentMovers)); assertEquals("thread quota is wrong", maxConcurrentMovers, - dataNode.xserver.balanceThrottler.getMaxConcurrentMovers()); // thread quota + dataNode.xserver.balanceThrottler.getMaxConcurrentMovers()); - int val = Math.abs(maxConcurrentMovers - defaultMaxThreads); - if (defaultMaxThreads < maxConcurrentMovers) { - for (int i = 0; i < val; i++) { - assertEquals("should be able to get thread quota", true, - dataNode.xserver.balanceThrottler.acquire()); - } - } else if (defaultMaxThreads > maxConcurrentMovers) { - for (int i = 0; i < val; i++) { - assertEquals("should not be able to get thread quota", false, - dataNode.xserver.balanceThrottler.acquire()); - } + for (int i = 0; i < maxConcurrentMovers; i++) { + assertEquals("should be able to get thread quota", true, + dataNode.xserver.balanceThrottler.acquire()); } assertEquals("should not be able to get thread quota", false,