HDFS-12093. [READ] Share remoteFS between ProvidedReplica instances.
This commit is contained in:
parent
663b3c08b1
commit
2407c9b93a
@ -20,6 +20,7 @@
|
|||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
@ -31,8 +32,9 @@ public class FinalizedProvidedReplica extends ProvidedReplica {
|
|||||||
|
|
||||||
public FinalizedProvidedReplica(long blockId, URI fileURI,
|
public FinalizedProvidedReplica(long blockId, URI fileURI,
|
||||||
long fileOffset, long blockLen, long genStamp,
|
long fileOffset, long blockLen, long genStamp,
|
||||||
FsVolumeSpi volume, Configuration conf) {
|
FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) {
|
||||||
super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf);
|
super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf,
|
||||||
|
remoteFS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -65,11 +65,17 @@ public abstract class ProvidedReplica extends ReplicaInfo {
|
|||||||
* @param volume the volume this block belongs to
|
* @param volume the volume this block belongs to
|
||||||
*/
|
*/
|
||||||
public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
|
public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
|
||||||
long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf) {
|
long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf,
|
||||||
|
FileSystem remoteFS) {
|
||||||
super(volume, blockId, blockLen, genStamp);
|
super(volume, blockId, blockLen, genStamp);
|
||||||
this.fileURI = fileURI;
|
this.fileURI = fileURI;
|
||||||
this.fileOffset = fileOffset;
|
this.fileOffset = fileOffset;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
if (remoteFS != null) {
|
||||||
|
this.remoteFS = remoteFS;
|
||||||
|
} else {
|
||||||
|
LOG.warn(
|
||||||
|
"Creating an reference to the remote FS for provided block " + this);
|
||||||
try {
|
try {
|
||||||
this.remoteFS = FileSystem.get(fileURI, this.conf);
|
this.remoteFS = FileSystem.get(fileURI, this.conf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -77,17 +83,14 @@ public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
|
|||||||
this.remoteFS = null;
|
this.remoteFS = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public ProvidedReplica(ProvidedReplica r) {
|
public ProvidedReplica(ProvidedReplica r) {
|
||||||
super(r);
|
super(r);
|
||||||
this.fileURI = r.fileURI;
|
this.fileURI = r.fileURI;
|
||||||
this.fileOffset = r.fileOffset;
|
this.fileOffset = r.fileOffset;
|
||||||
this.conf = r.conf;
|
this.conf = r.conf;
|
||||||
try {
|
this.remoteFS = r.remoteFS;
|
||||||
this.remoteFS = FileSystem.newInstance(fileURI, this.conf);
|
|
||||||
} catch (IOException e) {
|
|
||||||
this.remoteFS = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
@ -50,6 +51,7 @@ public class ReplicaBuilder {
|
|||||||
private long offset;
|
private long offset;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private FileRegion fileRegion;
|
private FileRegion fileRegion;
|
||||||
|
private FileSystem remoteFS;
|
||||||
|
|
||||||
public ReplicaBuilder(ReplicaState state) {
|
public ReplicaBuilder(ReplicaState state) {
|
||||||
volume = null;
|
volume = null;
|
||||||
@ -138,6 +140,11 @@ public ReplicaBuilder setFileRegion(FileRegion fileRegion) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ReplicaBuilder setRemoteFS(FileSystem remoteFS) {
|
||||||
|
this.remoteFS = remoteFS;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
|
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
|
||||||
throws IllegalArgumentException {
|
throws IllegalArgumentException {
|
||||||
LocalReplicaInPipeline info = null;
|
LocalReplicaInPipeline info = null;
|
||||||
@ -275,14 +282,14 @@ private ProvidedReplica buildProvidedFinalizedReplica()
|
|||||||
}
|
}
|
||||||
if (fileRegion == null) {
|
if (fileRegion == null) {
|
||||||
info = new FinalizedProvidedReplica(blockId, uri, offset,
|
info = new FinalizedProvidedReplica(blockId, uri, offset,
|
||||||
length, genStamp, volume, conf);
|
length, genStamp, volume, conf, remoteFS);
|
||||||
} else {
|
} else {
|
||||||
info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
|
info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
|
||||||
fileRegion.getPath().toUri(),
|
fileRegion.getPath().toUri(),
|
||||||
fileRegion.getOffset(),
|
fileRegion.getOffset(),
|
||||||
fileRegion.getBlock().getNumBytes(),
|
fileRegion.getBlock().getNumBytes(),
|
||||||
fileRegion.getBlock().getGenerationStamp(),
|
fileRegion.getBlock().getGenerationStamp(),
|
||||||
volume, conf);
|
volume, conf, remoteFS);
|
||||||
}
|
}
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
@ -96,7 +97,8 @@ void setFileRegionProvider(FileRegionProvider newProvider) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void getVolumeMap(ReplicaMap volumeMap,
|
public void getVolumeMap(ReplicaMap volumeMap,
|
||||||
RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
|
RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS)
|
||||||
|
throws IOException {
|
||||||
Iterator<FileRegion> iter = provider.iterator();
|
Iterator<FileRegion> iter = provider.iterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
FileRegion region = iter.next();
|
FileRegion region = iter.next();
|
||||||
@ -112,9 +114,10 @@ && containsBlock(providedVolume.baseURI,
|
|||||||
.setGenerationStamp(region.getBlock().getGenerationStamp())
|
.setGenerationStamp(region.getBlock().getGenerationStamp())
|
||||||
.setFsVolume(providedVolume)
|
.setFsVolume(providedVolume)
|
||||||
.setConf(conf)
|
.setConf(conf)
|
||||||
|
.setRemoteFS(remoteFS)
|
||||||
.build();
|
.build();
|
||||||
// check if the replica already exists
|
ReplicaInfo oldReplica =
|
||||||
ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
|
volumeMap.get(bpid, newReplica.getBlockId());
|
||||||
if (oldReplica == null) {
|
if (oldReplica == null) {
|
||||||
volumeMap.add(bpid, newReplica);
|
volumeMap.add(bpid, newReplica);
|
||||||
bpVolumeMap.add(bpid, newReplica);
|
bpVolumeMap.add(bpid, newReplica);
|
||||||
@ -163,6 +166,8 @@ public void compileReport(LinkedList<ScanInfo> report,
|
|||||||
new ConcurrentHashMap<String, ProvidedBlockPoolSlice>();
|
new ConcurrentHashMap<String, ProvidedBlockPoolSlice>();
|
||||||
|
|
||||||
private ProvidedVolumeDF df;
|
private ProvidedVolumeDF df;
|
||||||
|
//the remote FileSystem to which this ProvidedVolume points to.
|
||||||
|
private FileSystem remoteFS;
|
||||||
|
|
||||||
ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID,
|
ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID,
|
||||||
StorageDirectory sd, FileIoProvider fileIoProvider,
|
StorageDirectory sd, FileIoProvider fileIoProvider,
|
||||||
@ -176,6 +181,7 @@ assert getStorageLocation().getStorageType() == StorageType.PROVIDED:
|
|||||||
conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
|
conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
|
||||||
DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class);
|
DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class);
|
||||||
df = ReflectionUtils.newInstance(dfClass, conf);
|
df = ReflectionUtils.newInstance(dfClass, conf);
|
||||||
|
remoteFS = FileSystem.get(baseURI, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -397,7 +403,7 @@ void getVolumeMap(ReplicaMap volumeMap,
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.info("Creating volumemap for provided volume " + this);
|
LOG.info("Creating volumemap for provided volume " + this);
|
||||||
for(ProvidedBlockPoolSlice s : bpSlices.values()) {
|
for(ProvidedBlockPoolSlice s : bpSlices.values()) {
|
||||||
s.getVolumeMap(volumeMap, ramDiskReplicaMap);
|
s.getVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -414,7 +420,8 @@ private ProvidedBlockPoolSlice getProvidedBlockPoolSlice(String bpid)
|
|||||||
void getVolumeMap(String bpid, ReplicaMap volumeMap,
|
void getVolumeMap(String bpid, ReplicaMap volumeMap,
|
||||||
final RamDiskReplicaTracker ramDiskReplicaMap)
|
final RamDiskReplicaTracker ramDiskReplicaMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
|
getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap,
|
||||||
|
remoteFS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -87,7 +87,7 @@ private static void createProvidedReplicas(Configuration conf) {
|
|||||||
FILE_LEN >= (i+1)*BLK_LEN ? BLK_LEN : FILE_LEN - i*BLK_LEN;
|
FILE_LEN >= (i+1)*BLK_LEN ? BLK_LEN : FILE_LEN - i*BLK_LEN;
|
||||||
replicas.add(
|
replicas.add(
|
||||||
new FinalizedProvidedReplica(i, providedFile.toURI(), i*BLK_LEN,
|
new FinalizedProvidedReplica(i, providedFile.toURI(), i*BLK_LEN,
|
||||||
currentReplicaLength, 0, null, conf));
|
currentReplicaLength, 0, null, conf, null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user