HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes. (Walter Su via lei)

This commit is contained in:
Lei Xu 2015-10-23 13:52:59 -07:00
parent 600ad7bf41
commit 533a2be5ac
2 changed files with 38 additions and 74 deletions

View File

@ -1560,6 +1560,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7087. Ability to list /.reserved. (Xiao Chen via wang)
HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes.
(Walter Su via lei)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -21,7 +21,6 @@
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@ -30,9 +29,8 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.CopyOnWriteArrayList;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -46,8 +44,8 @@
import org.apache.hadoop.util.Time;
class FsVolumeList {
private final AtomicReference<FsVolumeImpl[]> volumes =
new AtomicReference<>(new FsVolumeImpl[0]);
private final CopyOnWriteArrayList<FsVolumeImpl> volumes =
new CopyOnWriteArrayList<>();
// Tracks volume failures, sorted by volume path.
private final Map<String, VolumeFailureInfo> volumeFailureInfos =
Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
@ -71,7 +69,7 @@ class FsVolumeList {
* Return an immutable list view of all the volumes.
*/
List<FsVolumeImpl> getVolumes() {
return Collections.unmodifiableList(Arrays.asList(volumes.get()));
return Collections.unmodifiableList(volumes);
}
private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
@ -98,10 +96,8 @@ private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
*/
FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
throws IOException {
// Get a snapshot of currently available volumes.
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.length);
for(FsVolumeImpl v : curVolumes) {
final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
for(FsVolumeImpl v : volumes) {
if (v.getStorageType() == storageType) {
list.add(v);
}
@ -129,7 +125,7 @@ FsVolumeReference getNextTransientVolume(long blockSize) throws IOException {
long getDfsUsed() throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes.get()) {
for (FsVolumeImpl v : volumes) {
try(FsVolumeReference ref = v.obtainReference()) {
dfsUsed += v.getDfsUsed();
} catch (ClosedChannelException e) {
@ -141,7 +137,7 @@ long getDfsUsed() throws IOException {
long getBlockPoolUsed(String bpid) throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes.get()) {
for (FsVolumeImpl v : volumes) {
try (FsVolumeReference ref = v.obtainReference()) {
dfsUsed += v.getBlockPoolUsed(bpid);
} catch (ClosedChannelException e) {
@ -153,7 +149,7 @@ long getBlockPoolUsed(String bpid) throws IOException {
long getCapacity() {
long capacity = 0L;
for (FsVolumeImpl v : volumes.get()) {
for (FsVolumeImpl v : volumes) {
try (FsVolumeReference ref = v.obtainReference()) {
capacity += v.getCapacity();
} catch (IOException e) {
@ -165,7 +161,7 @@ long getCapacity() {
long getRemaining() throws IOException {
long remaining = 0L;
for (FsVolumeSpi vol : volumes.get()) {
for (FsVolumeSpi vol : volumes) {
try (FsVolumeReference ref = vol.obtainReference()) {
remaining += vol.getAvailable();
} catch (ClosedChannelException e) {
@ -183,7 +179,7 @@ void getAllVolumesMap(final String bpid,
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
List<Thread> replicaAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes.get()) {
for (final FsVolumeImpl v : volumes) {
Thread t = new Thread() {
public void run() {
try (FsVolumeReference ref = v.obtainReference()) {
@ -267,7 +263,7 @@ Set<File> checkDirs() {
@Override
public String toString() {
return Arrays.toString(volumes.get());
return volumes.toString();
}
/**
@ -277,21 +273,7 @@ public String toString() {
*/
void addVolume(FsVolumeReference ref) {
FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
while (true) {
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
volumeList.add(volume);
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
break;
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(
"The volume list has been changed concurrently, " +
"retry to remove volume: " + ref.getVolume().getStorageID());
}
}
}
volumes.add(volume);
if (blockScanner != null) {
blockScanner.addVolumeScanner(ref);
} else {
@ -311,12 +293,7 @@ void addVolume(FsVolumeReference ref) {
* @param target the volume instance to be removed.
*/
private void removeVolume(FsVolumeImpl target) {
while (true) {
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
if (volumeList.remove(target)) {
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
if (volumes.remove(target)) {
if (blockScanner != null) {
blockScanner.removeVolumeScanner(target);
}
@ -328,21 +305,11 @@ private void removeVolume(FsVolumeImpl target) {
}
target.shutdown();
FsDatasetImpl.LOG.info("Removed volume: " + target);
break;
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(
"The volume list has been changed concurrently, " +
"retry to remove volume: " + target);
}
}
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Volume " + target +
" does not exist or is removed by others.");
}
break;
}
}
}
@ -352,16 +319,10 @@ private void removeVolume(FsVolumeImpl target) {
* @param clearFailure set true to remove failure info for this volume.
*/
void removeVolume(File volume, boolean clearFailure) {
// Make a copy of volumes to remove one volume.
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
FsVolumeImpl fsVolume = it.next();
String basePath, targetPath;
basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
targetPath = volume.getAbsolutePath();
for (FsVolumeImpl fsVolume : volumes) {
String basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
String targetPath = volume.getAbsolutePath();
if (basePath.equals(targetPath)) {
// Make sure the removed volume is the one in the curVolumes.
removeVolume(fsVolume);
}
}
@ -397,7 +358,7 @@ void addBlockPool(final String bpid, final Configuration conf) throws IOExceptio
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes.get()) {
for (final FsVolumeImpl v : volumes) {
Thread t = new Thread() {
public void run() {
try (FsVolumeReference ref = v.obtainReference()) {
@ -438,13 +399,13 @@ public void run() {
void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
blocksPerVolume) {
for (FsVolumeImpl v : volumes.get()) {
for (FsVolumeImpl v : volumes) {
v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage()));
}
}
void shutdown() {
for (FsVolumeImpl volume : volumes.get()) {
for (FsVolumeImpl volume : volumes) {
if(volume != null) {
volume.shutdown();
}