HDFS-12594. snapshotDiff fails if the report exceeds the RPC response limit. Contributed by Shashikant Banerjee
This commit is contained in:
parent
5cfaee2e6d
commit
b1c7654ee4
@ -19,6 +19,8 @@
|
||||
<Class name="org.apache.hadoop.hdfs.DFSPacket"/>
|
||||
<Class name="org.apache.hadoop.hdfs.protocol.LocatedStripedBlock"/>
|
||||
<Class name="org.apache.hadoop.hdfs.util.StripedBlockUtil$ChunkByteArray"/>
|
||||
<Class name="org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing$DiffReportListingEntry"/>
|
||||
<Class name="org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing"/>
|
||||
</Or>
|
||||
<Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2" />
|
||||
</Match>
|
||||
|
@ -139,10 +139,10 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ReencryptionStatusIterator;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
||||
@ -2140,14 +2140,16 @@ public void disallowSnapshot(String snapshotRoot) throws IOException {
|
||||
/**
|
||||
* Get the difference between two snapshots, or between a snapshot and the
|
||||
* current tree of a directory.
|
||||
* @see ClientProtocol#getSnapshotDiffReport(String, String, String)
|
||||
* @see ClientProtocol#getSnapshotDiffReportListing
|
||||
*/
|
||||
public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir,
|
||||
String fromSnapshot, String toSnapshot) throws IOException {
|
||||
public SnapshotDiffReportListing getSnapshotDiffReportListing(
|
||||
String snapshotDir, String fromSnapshot, String toSnapshot,
|
||||
byte[] startPath, int index) throws IOException {
|
||||
checkOpen();
|
||||
try (TraceScope ignored = tracer.newScope("getSnapshotDiffReport")) {
|
||||
return namenode.getSnapshotDiffReport(snapshotDir,
|
||||
fromSnapshot, toSnapshot);
|
||||
return namenode
|
||||
.getSnapshotDiffReportListing(snapshotDir, fromSnapshot, toSnapshot,
|
||||
startPath, index);
|
||||
} catch (RemoteException re) {
|
||||
throw re.unwrapRemoteException();
|
||||
}
|
||||
|
@ -89,6 +89,7 @@
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
|
||||
@ -124,6 +125,56 @@ public static String bytes2String(byte[] bytes) {
|
||||
return bytes2String(bytes, 0, bytes.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a byte array to array of arrays of bytes
|
||||
* on byte separator.
|
||||
*/
|
||||
public static byte[][] bytes2byteArray(byte[] bytes) {
|
||||
return bytes2byteArray(bytes, bytes.length, (byte)Path.SEPARATOR_CHAR);
|
||||
}
|
||||
/**
|
||||
* Splits first len bytes in bytes to array of arrays of bytes
|
||||
* on byte separator.
|
||||
* @param bytes the byte array to split
|
||||
* @param len the number of bytes to split
|
||||
* @param separator the delimiting byte
|
||||
*/
|
||||
public static byte[][] bytes2byteArray(byte[] bytes, int len,
|
||||
byte separator) {
|
||||
Preconditions.checkPositionIndex(len, bytes.length);
|
||||
if (len == 0) {
|
||||
return new byte[][]{null};
|
||||
}
|
||||
// Count the splits. Omit multiple separators and the last one by
|
||||
// peeking at prior byte.
|
||||
int splits = 0;
|
||||
for (int i = 1; i < len; i++) {
|
||||
if (bytes[i-1] == separator && bytes[i] != separator) {
|
||||
splits++;
|
||||
}
|
||||
}
|
||||
if (splits == 0 && bytes[0] == separator) {
|
||||
return new byte[][]{null};
|
||||
}
|
||||
splits++;
|
||||
byte[][] result = new byte[splits][];
|
||||
int nextIndex = 0;
|
||||
// Build the splits.
|
||||
for (int i = 0; i < splits; i++) {
|
||||
int startIndex = nextIndex;
|
||||
// find next separator in the bytes.
|
||||
while (nextIndex < len && bytes[nextIndex] != separator) {
|
||||
nextIndex++;
|
||||
}
|
||||
result[i] = (nextIndex > 0)
|
||||
? Arrays.copyOfRange(bytes, startIndex, nextIndex)
|
||||
: DFSUtilClient.EMPTY_BYTES; // reuse empty bytes for root.
|
||||
do { // skip over separators.
|
||||
nextIndex++;
|
||||
} while (nextIndex < len && bytes[nextIndex] == separator);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
/** Return used as percentage of capacity */
|
||||
public static float getPercentUsed(long used, long capacity) {
|
||||
return capacity <= 0 ? 100 : (used * 100.0f)/capacity;
|
||||
@ -277,11 +328,9 @@ public static int compareBytes(byte[] left, byte[] right) {
|
||||
* Given a list of path components returns a byte array
|
||||
*/
|
||||
public static byte[] byteArray2bytes(byte[][] pathComponents) {
|
||||
if (pathComponents.length == 0) {
|
||||
if (pathComponents.length == 0 || (pathComponents.length == 1
|
||||
&& (pathComponents[0] == null || pathComponents[0].length == 0))) {
|
||||
return EMPTY_BYTES;
|
||||
} else if (pathComponents.length == 1
|
||||
&& (pathComponents[0] == null || pathComponents[0].length == 0)) {
|
||||
return new byte[]{(byte) Path.SEPARATOR_CHAR};
|
||||
}
|
||||
int length = 0;
|
||||
for (int i = 0; i < pathComponents.length; i++) {
|
||||
|
@ -21,6 +21,7 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.collections.list.TreeList;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
@ -90,12 +91,16 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing.DiffReportListingEntry;
|
||||
import org.apache.hadoop.hdfs.client.impl.SnapshotDiffReportGenerator;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ChunkedArrayList;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
@ -1971,19 +1976,46 @@ public Void next(final FileSystem fs, final Path p)
|
||||
}.resolve(this, absF);
|
||||
}
|
||||
|
||||
private SnapshotDiffReport getSnapshotDiffReportInternal(
|
||||
final String snapshotDir, final String fromSnapshot,
|
||||
final String toSnapshot) throws IOException {
|
||||
byte[] startPath = DFSUtilClient.EMPTY_BYTES;
|
||||
int index = -1;
|
||||
SnapshotDiffReportGenerator snapshotDiffReport;
|
||||
List<DiffReportListingEntry> modifiedList = new TreeList();
|
||||
List<DiffReportListingEntry> createdList = new ChunkedArrayList<>();
|
||||
List<DiffReportListingEntry> deletedList = new ChunkedArrayList<>();
|
||||
SnapshotDiffReportListing report;
|
||||
do {
|
||||
report = dfs.getSnapshotDiffReportListing(snapshotDir, fromSnapshot,
|
||||
toSnapshot, startPath, index);
|
||||
startPath = report.getLastPath();
|
||||
index = report.getLastIndex();
|
||||
modifiedList.addAll(report.getModifyList());
|
||||
createdList.addAll(report.getCreateList());
|
||||
deletedList.addAll(report.getDeleteList());
|
||||
} while (!(Arrays.equals(startPath, DFSUtilClient.EMPTY_BYTES)
|
||||
&& index == -1));
|
||||
snapshotDiffReport =
|
||||
new SnapshotDiffReportGenerator(snapshotDir, fromSnapshot, toSnapshot,
|
||||
report.getIsFromEarlier(), modifiedList, createdList, deletedList);
|
||||
return snapshotDiffReport.generateReport();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the difference between two snapshots, or between a snapshot and the
|
||||
* current tree of a directory.
|
||||
*
|
||||
* @see DFSClient#getSnapshotDiffReport(String, String, String)
|
||||
* @see DFSClient#getSnapshotDiffReportListing
|
||||
*/
|
||||
public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
|
||||
final String fromSnapshot, final String toSnapshot) throws IOException {
|
||||
Path absF = fixRelativePart(snapshotDir);
|
||||
return new FileSystemLinkResolver<SnapshotDiffReport>() {
|
||||
@Override
|
||||
public SnapshotDiffReport doCall(final Path p) throws IOException {
|
||||
return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot,
|
||||
public SnapshotDiffReport doCall(final Path p)
|
||||
throws IOException {
|
||||
return getSnapshotDiffReportInternal(getPathName(p), fromSnapshot,
|
||||
toSnapshot);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,262 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.client.impl;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import com.google.common.primitives.SignedBytes;
|
||||
|
||||
import org.apache.hadoop.util.ChunkedArrayList;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing.DiffReportListingEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
|
||||
/**
|
||||
* This class represents to end users the difference between two snapshots of
|
||||
* the same directory, or the difference between a snapshot of the directory and
|
||||
* its current state. Instead of capturing all the details of the diff, this
|
||||
* class only lists where the changes happened and their types.
|
||||
*/
|
||||
public class SnapshotDiffReportGenerator {
|
||||
/**
|
||||
* Compare two inodes based on their full names.
|
||||
*/
|
||||
public static final Comparator<DiffReportListingEntry> INODE_COMPARATOR =
|
||||
new Comparator<DiffReportListingEntry>() {
|
||||
@Override
|
||||
public int compare(DiffReportListingEntry left,
|
||||
DiffReportListingEntry right) {
|
||||
final Comparator<byte[]> cmp =
|
||||
SignedBytes.lexicographicalComparator();
|
||||
//source path can never be null
|
||||
final byte[][] l = left.getSourcePath();
|
||||
final byte[][] r = right.getSourcePath();
|
||||
if (l.length == 1 && l[0] == null) {
|
||||
return -1;
|
||||
} else if (r.length == 1 && r[0] == null) {
|
||||
return 1;
|
||||
} else {
|
||||
for (int i = 0; i < l.length && i < r.length; i++) {
|
||||
final int diff = cmp.compare(l[i], r[i]);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
}
|
||||
return l.length == r.length ? 0 : l.length > r.length ? 1 : -1;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static class RenameEntry {
|
||||
private byte[][] sourcePath;
|
||||
private byte[][] targetPath;
|
||||
|
||||
void setSource(byte[][] srcPath) {
|
||||
this.sourcePath = srcPath;
|
||||
}
|
||||
|
||||
void setTarget(byte[][] target) {
|
||||
this.targetPath = target;
|
||||
}
|
||||
|
||||
boolean isRename() {
|
||||
return sourcePath != null && targetPath != null;
|
||||
}
|
||||
|
||||
byte[][] getSourcePath() {
|
||||
return sourcePath;
|
||||
}
|
||||
|
||||
byte[][] getTargetPath() {
|
||||
return targetPath;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* A class represnting the diff in a directory between two given snapshots
|
||||
* in two lists: createdList and deleted list.
|
||||
*/
|
||||
static class ChildrenDiff {
|
||||
private final List<DiffReportListingEntry> createdList;
|
||||
private final List<DiffReportListingEntry> deletedList;
|
||||
|
||||
ChildrenDiff(List<DiffReportListingEntry> createdList,
|
||||
List<DiffReportListingEntry> deletedList) {
|
||||
this.createdList = createdList != null ? createdList :
|
||||
Collections.emptyList();
|
||||
this.deletedList = deletedList != null ? deletedList :
|
||||
Collections.emptyList();
|
||||
}
|
||||
|
||||
public List<DiffReportListingEntry> getCreatedList() {
|
||||
return createdList;
|
||||
}
|
||||
|
||||
public List<DiffReportListingEntry> getDeletedList() {
|
||||
return deletedList;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* snapshot root full path.
|
||||
*/
|
||||
private final String snapshotRoot;
|
||||
|
||||
/**
|
||||
* start point of the diff.
|
||||
*/
|
||||
private final String fromSnapshot;
|
||||
|
||||
/**
|
||||
* end point of the diff.
|
||||
*/
|
||||
private final String toSnapshot;
|
||||
|
||||
/**
|
||||
* Flag to indicate the diff is calculated from older to newer snapshot
|
||||
* or not.
|
||||
*/
|
||||
private final boolean isFromEarlier;
|
||||
|
||||
/**
|
||||
* A map capturing the detailed difference about file creation/deletion.
|
||||
* Each key indicates a directory inode whose children have been changed
|
||||
* between the two snapshots, while its associated value is a
|
||||
* {@link ChildrenDiff} storing the changes (creation/deletion) happened to
|
||||
* the children (files).
|
||||
*/
|
||||
private final Map<Long, ChildrenDiff> dirDiffMap =
|
||||
new HashMap<>();
|
||||
|
||||
private final Map<Long, RenameEntry> renameMap =
|
||||
new HashMap<>();
|
||||
|
||||
private List<DiffReportListingEntry> mlist = null;
|
||||
private List<DiffReportListingEntry> clist = null;
|
||||
private List<DiffReportListingEntry> dlist = null;
|
||||
|
||||
public SnapshotDiffReportGenerator(String snapshotRoot, String fromSnapshot,
|
||||
String toSnapshot, boolean isFromEarlier,
|
||||
List<DiffReportListingEntry> mlist, List<DiffReportListingEntry> clist,
|
||||
List<DiffReportListingEntry> dlist) {
|
||||
this.snapshotRoot = snapshotRoot;
|
||||
this.fromSnapshot = fromSnapshot;
|
||||
this.toSnapshot = toSnapshot;
|
||||
this.isFromEarlier = isFromEarlier;
|
||||
this.mlist =
|
||||
mlist != null ? mlist : Collections.emptyList();
|
||||
this.clist =
|
||||
clist != null ? clist : Collections.emptyList();
|
||||
this.dlist =
|
||||
dlist != null ? dlist : Collections.emptyList();
|
||||
}
|
||||
|
||||
private RenameEntry getEntry(long inodeId) {
|
||||
RenameEntry entry = renameMap.get(inodeId);
|
||||
if (entry == null) {
|
||||
entry = new RenameEntry();
|
||||
renameMap.put(inodeId, entry);
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
public void generateReportList() {
|
||||
mlist.sort(INODE_COMPARATOR);
|
||||
for (DiffReportListingEntry created : clist) {
|
||||
ChildrenDiff entry = dirDiffMap.get(created.getDirId());
|
||||
if (entry == null) {
|
||||
List<DiffReportListingEntry> createdList = new ChunkedArrayList<>();
|
||||
createdList.add(created);
|
||||
ChildrenDiff list = new ChildrenDiff(createdList, null);
|
||||
dirDiffMap.put(created.getDirId(), list);
|
||||
} else {
|
||||
dirDiffMap.get(created.getDirId()).getCreatedList().add(created);
|
||||
}
|
||||
if (created.isReference()) {
|
||||
RenameEntry renameEntry = getEntry(created.getFileId());
|
||||
if (renameEntry.getTargetPath() != null) {
|
||||
renameEntry.setTarget(created.getSourcePath());
|
||||
}
|
||||
}
|
||||
}
|
||||
for (DiffReportListingEntry deleted : dlist) {
|
||||
ChildrenDiff entry = dirDiffMap.get(deleted.getDirId());
|
||||
if (entry == null || (entry.getDeletedList().isEmpty())) {
|
||||
ChildrenDiff list;
|
||||
List<DiffReportListingEntry> deletedList = new ChunkedArrayList<>();
|
||||
deletedList.add(deleted);
|
||||
if (entry == null) {
|
||||
list = new ChildrenDiff(null, deletedList);
|
||||
} else {
|
||||
list = new ChildrenDiff(entry.getCreatedList(), deletedList);
|
||||
}
|
||||
dirDiffMap.put(deleted.getDirId(), list);
|
||||
} else {
|
||||
entry.getDeletedList().add(deleted);
|
||||
}
|
||||
if (deleted.isReference()) {
|
||||
RenameEntry renameEntry = getEntry(deleted.getFileId());
|
||||
renameEntry.setTarget(deleted.getTargetPath());
|
||||
renameEntry.setSource(deleted.getSourcePath());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public SnapshotDiffReport generateReport() {
|
||||
List<DiffReportEntry> diffReportList = new ChunkedArrayList<>();
|
||||
generateReportList();
|
||||
for (DiffReportListingEntry modified : mlist) {
|
||||
diffReportList.add(
|
||||
new DiffReportEntry(DiffType.MODIFY, modified.getSourcePath(), null));
|
||||
if (modified.isReference()
|
||||
&& dirDiffMap.get(modified.getDirId()) != null) {
|
||||
List<DiffReportEntry> subList = generateReport(modified);
|
||||
diffReportList.addAll(subList);
|
||||
}
|
||||
}
|
||||
return new SnapshotDiffReport(snapshotRoot, fromSnapshot, toSnapshot,
|
||||
diffReportList);
|
||||
}
|
||||
|
||||
private List<DiffReportEntry> generateReport(
|
||||
DiffReportListingEntry modified) {
|
||||
List<DiffReportEntry> diffReportList = new ChunkedArrayList<>();
|
||||
ChildrenDiff list = dirDiffMap.get(modified.getDirId());
|
||||
for (DiffReportListingEntry created : list.getCreatedList()) {
|
||||
RenameEntry entry = renameMap.get(created.getFileId());
|
||||
if (entry == null || !entry.isRename()) {
|
||||
diffReportList.add(new DiffReportEntry(
|
||||
isFromEarlier ? DiffType.CREATE : DiffType.DELETE,
|
||||
created.getSourcePath()));
|
||||
}
|
||||
}
|
||||
for (DiffReportListingEntry deleted : list.getDeletedList()) {
|
||||
RenameEntry entry = renameMap.get(deleted.getFileId());
|
||||
if (entry != null && entry.isRename()) {
|
||||
diffReportList.add(new DiffReportEntry(DiffType.RENAME,
|
||||
isFromEarlier ? entry.getSourcePath() : entry.getTargetPath(),
|
||||
isFromEarlier ? entry.getTargetPath() : entry.getSourcePath()));
|
||||
} else {
|
||||
diffReportList.add(new DiffReportEntry(
|
||||
isFromEarlier ? DiffType.DELETE : DiffType.CREATE,
|
||||
deleted.getSourcePath()));
|
||||
}
|
||||
}
|
||||
return diffReportList;
|
||||
}
|
||||
}
|
@ -1288,6 +1288,35 @@ void disallowSnapshot(String snapshotRoot)
|
||||
SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
|
||||
String fromSnapshot, String toSnapshot) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the difference between two snapshots, or between a snapshot and the
|
||||
* current tree of a directory.
|
||||
*
|
||||
* @param snapshotRoot
|
||||
* full path of the directory where snapshots are taken
|
||||
* @param fromSnapshot
|
||||
* snapshot name of the from point. Null indicates the current
|
||||
* tree
|
||||
* @param toSnapshot
|
||||
* snapshot name of the to point. Null indicates the current
|
||||
* tree.
|
||||
* @param startPath
|
||||
* path relative to the snapshottable root directory from where the
|
||||
* snapshotdiff computation needs to start across multiple rpc calls
|
||||
* @param index
|
||||
* index in the created or deleted list of the directory at which
|
||||
* the snapshotdiff computation stopped during the last rpc call
|
||||
* as the no of entries exceeded the snapshotdiffentry limit. -1
|
||||
* indicates, the snapshotdiff compuatation needs to start right
|
||||
* from the startPath provided.
|
||||
* @return The difference report represented as a {@link SnapshotDiffReport}.
|
||||
* @throws IOException on error
|
||||
*/
|
||||
@Idempotent
|
||||
SnapshotDiffReportListing getSnapshotDiffReportListing(String snapshotRoot,
|
||||
String fromSnapshot, String toSnapshot, byte[] startPath, int index)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Add a CacheDirective to the CacheManager.
|
||||
*
|
||||
|
@ -0,0 +1,160 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.curator.shaded.com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
|
||||
/**
|
||||
* This class represents to the difference between two snapshots of
|
||||
* the same directory, or the difference between a snapshot of the directory and
|
||||
* its current state. This Class serves the purpose of collecting diff entries
|
||||
* in 3 lists : created, deleted and modified list combined size of which is set
|
||||
* by dfs.snapshotdiff-report.limit over one rpc call to the namenode.
|
||||
*/
|
||||
public class SnapshotDiffReportListing {
|
||||
/**
|
||||
* Representing the full path and diff type of a file/directory where changes
|
||||
* have happened.
|
||||
*/
|
||||
public static class DiffReportListingEntry {
|
||||
/**
|
||||
* The type of the difference.
|
||||
*/
|
||||
private final long fileId;
|
||||
private final long dirId;
|
||||
private final boolean isReference;
|
||||
/**
|
||||
* The relative path (related to the snapshot root) of 1) the file/directory
|
||||
* where changes have happened, or 2) the source file/dir of a rename op.
|
||||
* or 3) target file/dir for a rename op.
|
||||
*/
|
||||
private final byte[][] sourcePath;
|
||||
private final byte[][] targetPath;
|
||||
|
||||
public DiffReportListingEntry(long dirId, long fileId, byte[][] sourcePath,
|
||||
boolean isReference, byte[][] targetPath) {
|
||||
Preconditions.checkNotNull(sourcePath);
|
||||
this.dirId = dirId;
|
||||
this.fileId = fileId;
|
||||
this.sourcePath = sourcePath;
|
||||
this.isReference = isReference;
|
||||
this.targetPath = targetPath;
|
||||
}
|
||||
|
||||
public DiffReportListingEntry(long dirId, long fileId, byte[] sourcePath,
|
||||
boolean isReference, byte[] targetpath) {
|
||||
Preconditions.checkNotNull(sourcePath);
|
||||
this.dirId = dirId;
|
||||
this.fileId = fileId;
|
||||
this.sourcePath = DFSUtilClient.bytes2byteArray(sourcePath);
|
||||
this.isReference = isReference;
|
||||
this.targetPath =
|
||||
targetpath == null ? null : DFSUtilClient.bytes2byteArray(targetpath);
|
||||
}
|
||||
|
||||
public long getDirId() {
|
||||
return dirId;
|
||||
}
|
||||
|
||||
public long getFileId() {
|
||||
return fileId;
|
||||
}
|
||||
|
||||
public byte[][] getSourcePath() {
|
||||
return sourcePath;
|
||||
}
|
||||
|
||||
public byte[][] getTargetPath() {
|
||||
return targetPath;
|
||||
}
|
||||
|
||||
public boolean isReference() {
|
||||
return isReference;
|
||||
}
|
||||
}
|
||||
|
||||
/** store the starting path to process across RPC's for snapshot diff. */
|
||||
private final byte[] lastPath;
|
||||
|
||||
private final int lastIndex;
|
||||
|
||||
private final boolean isFromEarlier;
|
||||
|
||||
/** list of diff. */
|
||||
private final List<DiffReportListingEntry> modifyList;
|
||||
|
||||
private final List<DiffReportListingEntry> createList;
|
||||
|
||||
private final List<DiffReportListingEntry> deleteList;
|
||||
|
||||
public SnapshotDiffReportListing() {
|
||||
this.modifyList = Collections.emptyList();
|
||||
this.createList = Collections.emptyList();
|
||||
this.deleteList = Collections.emptyList();
|
||||
this.lastPath = DFSUtilClient.string2Bytes("");
|
||||
this.lastIndex = -1;
|
||||
this.isFromEarlier = false;
|
||||
}
|
||||
|
||||
public SnapshotDiffReportListing(byte[] startPath,
|
||||
List<DiffReportListingEntry> modifiedEntryList,
|
||||
List<DiffReportListingEntry> createdEntryList,
|
||||
List<DiffReportListingEntry> deletedEntryList, int index,
|
||||
boolean isFromEarlier) {
|
||||
this.modifyList = modifiedEntryList;
|
||||
this.createList = createdEntryList;
|
||||
this.deleteList = deletedEntryList;
|
||||
this.lastPath =
|
||||
startPath != null ? startPath : DFSUtilClient.string2Bytes("");
|
||||
this.lastIndex = index;
|
||||
this.isFromEarlier = isFromEarlier;
|
||||
}
|
||||
|
||||
public List<DiffReportListingEntry> getModifyList() {
|
||||
return modifyList;
|
||||
}
|
||||
|
||||
public List<DiffReportListingEntry> getCreateList() {
|
||||
return createList;
|
||||
}
|
||||
|
||||
public List<DiffReportListingEntry> getDeleteList() {
|
||||
return deleteList;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@link #lastPath}
|
||||
*/
|
||||
public byte[] getLastPath() {
|
||||
return lastPath;
|
||||
}
|
||||
|
||||
public int getLastIndex() {
|
||||
return lastIndex;
|
||||
}
|
||||
|
||||
public boolean getIsFromEarlier() {
|
||||
return isFromEarlier;
|
||||
}
|
||||
|
||||
}
|
@ -79,6 +79,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
|
||||
@ -133,6 +134,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
|
||||
@ -1205,6 +1208,27 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SnapshotDiffReportListing getSnapshotDiffReportListing(
|
||||
String snapshotRoot, String fromSnapshot, String toSnapshot,
|
||||
byte[] startPath, int index) throws IOException {
|
||||
GetSnapshotDiffReportListingRequestProto req =
|
||||
GetSnapshotDiffReportListingRequestProto.newBuilder()
|
||||
.setSnapshotRoot(snapshotRoot).setFromSnapshot(fromSnapshot)
|
||||
.setToSnapshot(toSnapshot).setCursor(
|
||||
HdfsProtos.SnapshotDiffReportCursorProto.newBuilder()
|
||||
.setStartPath(PBHelperClient.getByteString(startPath))
|
||||
.setIndex(index).build()).build();
|
||||
try {
|
||||
GetSnapshotDiffReportListingResponseProto result =
|
||||
rpcProxy.getSnapshotDiffReportListing(null, req);
|
||||
|
||||
return PBHelperClient.convert(result.getDiffReport());
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long addCacheDirective(CacheDirectiveInfo directive,
|
||||
EnumSet<CacheFlag> flags) throws IOException {
|
||||
|
@ -99,6 +99,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing.DiffReportListingEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
|
||||
@ -169,6 +171,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.QuotaUsageProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RollingUpgradeStatusProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportListingEntryProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportListingProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportEntryProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryListingProto;
|
||||
@ -1489,6 +1493,61 @@ public static DiffReportEntry convert(SnapshotDiffReportEntryProto entry) {
|
||||
.toByteArray() : null);
|
||||
}
|
||||
|
||||
public static SnapshotDiffReportListing convert(
|
||||
SnapshotDiffReportListingProto reportProto) {
|
||||
if (reportProto == null) {
|
||||
return null;
|
||||
}
|
||||
List<SnapshotDiffReportListingEntryProto> modifyList =
|
||||
reportProto.getModifiedEntriesList();
|
||||
List<DiffReportListingEntry> modifiedEntries = new ChunkedArrayList<>();
|
||||
for (SnapshotDiffReportListingEntryProto entryProto : modifyList) {
|
||||
DiffReportListingEntry entry = convert(entryProto);
|
||||
if (entry != null) {
|
||||
modifiedEntries.add(entry);
|
||||
}
|
||||
}
|
||||
List<SnapshotDiffReportListingEntryProto> createList =
|
||||
reportProto.getCreatedEntriesList();
|
||||
List<DiffReportListingEntry> createdEntries = new ChunkedArrayList<>();
|
||||
for (SnapshotDiffReportListingEntryProto entryProto : createList) {
|
||||
DiffReportListingEntry entry = convert(entryProto);
|
||||
if (entry != null) {
|
||||
createdEntries.add(entry);
|
||||
}
|
||||
}
|
||||
List<SnapshotDiffReportListingEntryProto> deletedList =
|
||||
reportProto.getDeletedEntriesList();
|
||||
List<DiffReportListingEntry> deletedEntries = new ChunkedArrayList<>();
|
||||
for (SnapshotDiffReportListingEntryProto entryProto : deletedList) {
|
||||
DiffReportListingEntry entry = convert(entryProto);
|
||||
if (entry != null) {
|
||||
deletedEntries.add(entry);
|
||||
}
|
||||
}
|
||||
byte[] startPath = reportProto.getCursor().getStartPath().toByteArray();
|
||||
boolean isFromEarlier = reportProto.getIsFromEarlier();
|
||||
|
||||
int index = reportProto.getCursor().getIndex();
|
||||
return new SnapshotDiffReportListing(startPath, modifiedEntries,
|
||||
createdEntries, deletedEntries, index, isFromEarlier);
|
||||
}
|
||||
|
||||
public static DiffReportListingEntry convert(
|
||||
SnapshotDiffReportListingEntryProto entry) {
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
long dirId = entry.getDirId();
|
||||
long fileId = entry.getFileId();
|
||||
boolean isReference = entry.getIsReference();
|
||||
byte[] sourceName = entry.getFullpath().toByteArray();
|
||||
byte[] targetName =
|
||||
entry.hasTargetPath() ? entry.getTargetPath().toByteArray() : null;
|
||||
return new DiffReportListingEntry(dirId, fileId, sourceName, isReference,
|
||||
targetName);
|
||||
}
|
||||
|
||||
public static SnapshottableDirectoryStatus[] convert(
|
||||
SnapshottableDirectoryListingProto sdlp) {
|
||||
if (sdlp == null)
|
||||
@ -2508,6 +2567,74 @@ public static SnapshotDiffReportEntryProto convert(DiffReportEntry entry) {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static SnapshotDiffReportListingEntryProto convert(
|
||||
DiffReportListingEntry entry) {
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
ByteString sourcePath = getByteString(
|
||||
entry.getSourcePath() == null ? DFSUtilClient.EMPTY_BYTES :
|
||||
DFSUtilClient.byteArray2bytes(entry.getSourcePath()));
|
||||
long dirId = entry.getDirId();
|
||||
long fileId = entry.getFileId();
|
||||
boolean isReference = entry.isReference();
|
||||
ByteString targetPath = getByteString(
|
||||
entry.getTargetPath() == null ? DFSUtilClient.EMPTY_BYTES :
|
||||
DFSUtilClient.byteArray2bytes(entry.getTargetPath()));
|
||||
SnapshotDiffReportListingEntryProto.Builder builder =
|
||||
SnapshotDiffReportListingEntryProto.newBuilder().setFullpath(sourcePath)
|
||||
.setDirId(dirId).setFileId(fileId).setIsReference(isReference)
|
||||
.setTargetPath(targetPath);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static SnapshotDiffReportListingProto convert(
|
||||
SnapshotDiffReportListing report) {
|
||||
if (report == null) {
|
||||
return null;
|
||||
}
|
||||
ByteString startPath = getByteString(
|
||||
report.getLastPath() == null ? DFSUtilClient.EMPTY_BYTES :
|
||||
report.getLastPath());
|
||||
List<DiffReportListingEntry> modifiedEntries = report.getModifyList();
|
||||
List<DiffReportListingEntry> createdEntries = report.getCreateList();
|
||||
List<DiffReportListingEntry> deletedEntries = report.getDeleteList();
|
||||
List<SnapshotDiffReportListingEntryProto> modifiedEntryProtos =
|
||||
new ChunkedArrayList<>();
|
||||
for (DiffReportListingEntry entry : modifiedEntries) {
|
||||
SnapshotDiffReportListingEntryProto entryProto = convert(entry);
|
||||
if (entryProto != null) {
|
||||
modifiedEntryProtos.add(entryProto);
|
||||
}
|
||||
}
|
||||
List<SnapshotDiffReportListingEntryProto> createdEntryProtos =
|
||||
new ChunkedArrayList<>();
|
||||
for (DiffReportListingEntry entry : createdEntries) {
|
||||
SnapshotDiffReportListingEntryProto entryProto = convert(entry);
|
||||
if (entryProto != null) {
|
||||
createdEntryProtos.add(entryProto);
|
||||
}
|
||||
}
|
||||
List<SnapshotDiffReportListingEntryProto> deletedEntryProtos =
|
||||
new ChunkedArrayList<>();
|
||||
for (DiffReportListingEntry entry : deletedEntries) {
|
||||
SnapshotDiffReportListingEntryProto entryProto = convert(entry);
|
||||
if (entryProto != null) {
|
||||
deletedEntryProtos.add(entryProto);
|
||||
}
|
||||
}
|
||||
|
||||
return SnapshotDiffReportListingProto.newBuilder()
|
||||
.addAllModifiedEntries(modifiedEntryProtos)
|
||||
.addAllCreatedEntries(createdEntryProtos)
|
||||
.addAllDeletedEntries(deletedEntryProtos)
|
||||
.setIsFromEarlier(report.getIsFromEarlier())
|
||||
.setCursor(HdfsProtos.SnapshotDiffReportCursorProto.newBuilder()
|
||||
.setStartPath(startPath)
|
||||
.setIndex(report.getLastIndex()).build())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static SnapshotDiffReportProto convert(SnapshotDiffReport report) {
|
||||
if (report == null) {
|
||||
return null;
|
||||
|
@ -297,6 +297,16 @@ message GetSnapshotDiffReportResponseProto {
|
||||
required SnapshotDiffReportProto diffReport = 1;
|
||||
}
|
||||
|
||||
message GetSnapshotDiffReportListingRequestProto {
|
||||
required string snapshotRoot = 1;
|
||||
required string fromSnapshot = 2;
|
||||
required string toSnapshot = 3;
|
||||
optional SnapshotDiffReportCursorProto cursor = 4;
|
||||
}
|
||||
|
||||
message GetSnapshotDiffReportListingResponseProto {
|
||||
required SnapshotDiffReportListingProto diffReport = 1;
|
||||
}
|
||||
message RenewLeaseRequestProto {
|
||||
required string clientName = 1;
|
||||
}
|
||||
@ -913,6 +923,8 @@ service ClientNamenodeProtocol {
|
||||
returns(DeleteSnapshotResponseProto);
|
||||
rpc getSnapshotDiffReport(GetSnapshotDiffReportRequestProto)
|
||||
returns(GetSnapshotDiffReportResponseProto);
|
||||
rpc getSnapshotDiffReportListing(GetSnapshotDiffReportListingRequestProto)
|
||||
returns(GetSnapshotDiffReportListingResponseProto);
|
||||
rpc isFileClosed(IsFileClosedRequestProto)
|
||||
returns(IsFileClosedResponseProto);
|
||||
rpc modifyAclEntries(ModifyAclEntriesRequestProto)
|
||||
|
@ -528,6 +528,32 @@ message SnapshotDiffReportProto {
|
||||
repeated SnapshotDiffReportEntryProto diffReportEntries = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshot diff report listing entry
|
||||
*/
|
||||
message SnapshotDiffReportListingEntryProto {
|
||||
required bytes fullpath = 1;
|
||||
required uint64 dirId = 2;
|
||||
required bool isReference = 3;
|
||||
optional bytes targetPath = 4;
|
||||
optional uint64 fileId = 5;
|
||||
}
|
||||
|
||||
message SnapshotDiffReportCursorProto {
|
||||
required bytes startPath = 1;
|
||||
required int32 index = 2 [default = -1];
|
||||
}
|
||||
/**
|
||||
* Snapshot diff report listing
|
||||
*/
|
||||
message SnapshotDiffReportListingProto {
|
||||
// full path of the directory where snapshots were taken
|
||||
repeated SnapshotDiffReportListingEntryProto modifiedEntries = 1;
|
||||
repeated SnapshotDiffReportListingEntryProto createdEntries = 2;
|
||||
repeated SnapshotDiffReportListingEntryProto deletedEntries = 3;
|
||||
required bool isFromEarlier = 4;
|
||||
optional SnapshotDiffReportCursorProto cursor = 5;
|
||||
}
|
||||
/**
|
||||
* Block information
|
||||
*
|
||||
|
@ -381,6 +381,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
DFS_NAMENODE_SNAPSHOT_DIFF_ALLOW_SNAP_ROOT_DESCENDANT_DEFAULT =
|
||||
true;
|
||||
|
||||
public static final String
|
||||
DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT =
|
||||
"dfs.namenode.snapshotdiff.listing.limit";
|
||||
public static final int
|
||||
DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT = 1000;
|
||||
// Whether to enable datanode's stale state detection and usage for reads
|
||||
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
|
||||
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
|
||||
|
@ -349,7 +349,8 @@ public static String path2String(final Object path) {
|
||||
public static byte[][] getPathComponents(String path) {
|
||||
// avoid intermediate split to String[]
|
||||
final byte[] bytes = string2Bytes(path);
|
||||
return bytes2byteArray(bytes, bytes.length, (byte)Path.SEPARATOR_CHAR);
|
||||
return DFSUtilClient
|
||||
.bytes2byteArray(bytes, bytes.length, (byte) Path.SEPARATOR_CHAR);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -369,42 +370,9 @@ public static byte[][] bytes2byteArray(byte[] bytes, byte separator) {
|
||||
* @param len the number of bytes to split
|
||||
* @param separator the delimiting byte
|
||||
*/
|
||||
public static byte[][] bytes2byteArray(byte[] bytes,
|
||||
int len,
|
||||
public static byte[][] bytes2byteArray(byte[] bytes, int len,
|
||||
byte separator) {
|
||||
Preconditions.checkPositionIndex(len, bytes.length);
|
||||
if (len == 0) {
|
||||
return new byte[][]{null};
|
||||
}
|
||||
// Count the splits. Omit multiple separators and the last one by
|
||||
// peeking at prior byte.
|
||||
int splits = 0;
|
||||
for (int i = 1; i < len; i++) {
|
||||
if (bytes[i-1] == separator && bytes[i] != separator) {
|
||||
splits++;
|
||||
}
|
||||
}
|
||||
if (splits == 0 && bytes[0] == separator) {
|
||||
return new byte[][]{null};
|
||||
}
|
||||
splits++;
|
||||
byte[][] result = new byte[splits][];
|
||||
int nextIndex = 0;
|
||||
// Build the splits.
|
||||
for (int i = 0; i < splits; i++) {
|
||||
int startIndex = nextIndex;
|
||||
// find next separator in the bytes.
|
||||
while (nextIndex < len && bytes[nextIndex] != separator) {
|
||||
nextIndex++;
|
||||
}
|
||||
result[i] = (nextIndex > 0)
|
||||
? Arrays.copyOfRange(bytes, startIndex, nextIndex)
|
||||
: DFSUtilClient.EMPTY_BYTES; // reuse empty bytes for root.
|
||||
do { // skip over separators.
|
||||
nextIndex++;
|
||||
} while (nextIndex < len && bytes[nextIndex] == separator);
|
||||
}
|
||||
return result;
|
||||
return DFSUtilClient.bytes2byteArray(bytes, len, separator);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -55,6 +55,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
|
||||
@ -143,6 +144,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
|
||||
@ -1245,6 +1248,25 @@ public GetSnapshotDiffReportResponseProto getSnapshotDiffReport(
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetSnapshotDiffReportListingResponseProto getSnapshotDiffReportListing(
|
||||
RpcController controller,
|
||||
GetSnapshotDiffReportListingRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
SnapshotDiffReportListing report = server
|
||||
.getSnapshotDiffReportListing(request.getSnapshotRoot(),
|
||||
request.getFromSnapshot(), request.getToSnapshot(),
|
||||
request.getCursor().getStartPath().toByteArray(),
|
||||
request.getCursor().getIndex());
|
||||
//request.getStartPath(), request.getIndex());
|
||||
return GetSnapshotDiffReportListingResponseProto.newBuilder()
|
||||
.setDiffReport(PBHelperClient.convert(report)).build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsFileClosedResponseProto isFileClosed(
|
||||
RpcController controller, IsFileClosedRequestProto request)
|
||||
|
@ -92,6 +92,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
|
||||
@ -1508,6 +1509,14 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public SnapshotDiffReportListing getSnapshotDiffReportListing(
|
||||
String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
|
||||
byte[] startPath, int index) throws IOException {
|
||||
checkOperation(OperationCategory.READ, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public long addCacheDirective(CacheDirectiveInfo path,
|
||||
EnumSet<CacheFlag> flags) throws IOException {
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.FSLimitException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
|
||||
@ -164,6 +165,29 @@ static SnapshotDiffReport getSnapshotDiffReport(FSDirectory fsd,
|
||||
return diffs;
|
||||
}
|
||||
|
||||
static SnapshotDiffReportListing getSnapshotDiffReportListing(FSDirectory fsd,
|
||||
SnapshotManager snapshotManager, String path, String fromSnapshot,
|
||||
String toSnapshot, byte[] startPath, int index,
|
||||
int snapshotDiffReportLimit) throws IOException {
|
||||
SnapshotDiffReportListing diffs;
|
||||
final FSPermissionChecker pc = fsd.getPermissionChecker();
|
||||
fsd.readLock();
|
||||
try {
|
||||
INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ);
|
||||
if (fsd.isPermissionEnabled()) {
|
||||
checkSubtreeReadPermission(fsd, pc, path, fromSnapshot);
|
||||
checkSubtreeReadPermission(fsd, pc, path, toSnapshot);
|
||||
}
|
||||
diffs = snapshotManager
|
||||
.diff(iip, path, fromSnapshot, toSnapshot, startPath, index,
|
||||
snapshotDiffReportLimit);
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
} finally {
|
||||
fsd.readUnlock();
|
||||
}
|
||||
return diffs;
|
||||
}
|
||||
/** Get a collection of full snapshot paths given file and snapshot dir.
|
||||
* @param lsf a list of snapshottable features
|
||||
* @param file full path of the file
|
||||
|
@ -87,6 +87,8 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
|
||||
|
||||
@ -95,6 +97,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
|
||||
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
@ -211,7 +215,6 @@
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
@ -426,6 +429,7 @@ private void logAuditEvent(boolean succeeded,
|
||||
private final UserGroupInformation fsOwner;
|
||||
private final String supergroup;
|
||||
private final boolean standbyShouldCheckpoint;
|
||||
private final int snapshotDiffReportLimit;
|
||||
|
||||
/** Interval between each check of lease to release. */
|
||||
private final long leaseRecheckIntervalMs;
|
||||
@ -761,6 +765,10 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
||||
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
|
||||
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
|
||||
DFS_PERMISSIONS_ENABLED_DEFAULT);
|
||||
this.snapshotDiffReportLimit =
|
||||
conf.getInt(DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT,
|
||||
DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT);
|
||||
|
||||
LOG.info("fsOwner = " + fsOwner);
|
||||
LOG.info("supergroup = " + supergroup);
|
||||
LOG.info("isPermissionEnabled = " + isPermissionEnabled);
|
||||
@ -6404,6 +6412,63 @@ SnapshotDiffReport getSnapshotDiffReport(String path,
|
||||
return diffs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the difference between two snapshots (or between a snapshot and the
|
||||
* current status) of a snapshottable directory.
|
||||
*
|
||||
* @param path The full path of the snapshottable directory.
|
||||
* @param fromSnapshot Name of the snapshot to calculate the diff from. Null
|
||||
* or empty string indicates the current tree.
|
||||
* @param toSnapshot Name of the snapshot to calculated the diff to. Null or
|
||||
* empty string indicates the current tree.
|
||||
* @param startPath
|
||||
* path relative to the snapshottable root directory from where the
|
||||
* snapshotdiff computation needs to start across multiple rpc calls
|
||||
* @param index
|
||||
* index in the created or deleted list of the directory at which
|
||||
* the snapshotdiff computation stopped during the last rpc call
|
||||
* as the no of entries exceeded the snapshotdiffentry limit. -1
|
||||
* indicates, the snapshotdiff compuatation needs to start right
|
||||
* from the startPath provided.
|
||||
* @return A partial report about the difference between {@code fromSnapshot}
|
||||
* and {@code toSnapshot}. Modified/deleted/created/renamed files and
|
||||
* directories belonging to the snapshottable directories are listed
|
||||
* and labeled as M/-/+/R respectively.
|
||||
* @throws IOException
|
||||
*/
|
||||
SnapshotDiffReportListing getSnapshotDiffReportListing(String path,
|
||||
String fromSnapshot, String toSnapshot, byte[] startPath, int index)
|
||||
throws IOException {
|
||||
final String operationName = "computeSnapshotDiff";
|
||||
SnapshotDiffReportListing diffs = null;
|
||||
checkOperation(OperationCategory.READ);
|
||||
boolean success = false;
|
||||
String fromSnapshotRoot =
|
||||
(fromSnapshot == null || fromSnapshot.isEmpty()) ? path :
|
||||
Snapshot.getSnapshotPath(path, fromSnapshot);
|
||||
String toSnapshotRoot =
|
||||
(toSnapshot == null || toSnapshot.isEmpty()) ? path :
|
||||
Snapshot.getSnapshotPath(path, toSnapshot);
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
diffs = FSDirSnapshotOp
|
||||
.getSnapshotDiffReportListing(dir, snapshotManager, path,
|
||||
fromSnapshot, toSnapshot, startPath, index,
|
||||
snapshotDiffReportLimit);
|
||||
success = true;
|
||||
} catch (AccessControlException ace) {
|
||||
logAuditEvent(success, operationName, fromSnapshotRoot, toSnapshotRoot,
|
||||
null);
|
||||
throw ace;
|
||||
} finally {
|
||||
readUnlock(operationName);
|
||||
}
|
||||
logAuditEvent(success, operationName, fromSnapshotRoot, toSnapshotRoot,
|
||||
null);
|
||||
return diffs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a snapshot of a snapshottable directory
|
||||
* @param snapshotRoot The snapshottable directory
|
||||
|
@ -121,6 +121,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||
@ -1862,6 +1863,18 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
|
||||
return report;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public SnapshotDiffReportListing getSnapshotDiffReportListing(
|
||||
String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
|
||||
byte[] startPath, int index) throws IOException {
|
||||
checkNNStartup();
|
||||
SnapshotDiffReportListing report = namesystem
|
||||
.getSnapshotDiffReportListing(snapshotRoot, earlierSnapshotName,
|
||||
laterSnapshotName, startPath, index);
|
||||
metrics.incrSnapshotDiffReportOps();
|
||||
return report;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public long addCacheDirective(
|
||||
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
|
||||
|
@ -24,10 +24,12 @@
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
@ -284,6 +286,54 @@ SnapshotDiffInfo computeDiff(final INodeDirectory snapshotRootDir,
|
||||
return diffs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the difference between two snapshots (or a snapshot and the current
|
||||
* directory) of the directory. The diff calculation can be scoped to either
|
||||
* the snapshot root or any descendant directory under the snapshot root.
|
||||
*
|
||||
* @param snapshotRootDir the snapshot root directory
|
||||
* @param snapshotDiffScopeDir the descendant directory under snapshot root
|
||||
* to scope the diff calculation to.
|
||||
* @param from The name of the start point of the comparison. Null indicating
|
||||
* the current tree.
|
||||
* @param to The name of the end point. Null indicating the current tree.
|
||||
* @param startPath
|
||||
* path relative to the snapshottable root directory from where the
|
||||
* snapshotdiff computation needs to start across multiple rpc calls
|
||||
* @param index
|
||||
* index in the created or deleted list of the directory at which
|
||||
* the snapshotdiff computation stopped during the last rpc call
|
||||
* as the no of entries exceeded the snapshotdiffentry limit. -1
|
||||
* indicates, the snapshotdiff computation needs to start right
|
||||
* from the startPath provided.
|
||||
*
|
||||
* @return The difference between the start/end points.
|
||||
* @throws SnapshotException If there is no snapshot matching the starting
|
||||
* point, or if endSnapshotName is not null but cannot be identified
|
||||
* as a previous snapshot.
|
||||
*/
|
||||
SnapshotDiffListingInfo computeDiff(final INodeDirectory snapshotRootDir,
|
||||
final INodeDirectory snapshotDiffScopeDir, final String from,
|
||||
final String to, byte[] startPath, int index,
|
||||
int snapshotDiffReportEntriesLimit) throws SnapshotException {
|
||||
Preconditions.checkArgument(
|
||||
snapshotDiffScopeDir.isDescendantOfSnapshotRoot(snapshotRootDir));
|
||||
Snapshot fromSnapshot = getSnapshotByName(snapshotRootDir, from);
|
||||
Snapshot toSnapshot = getSnapshotByName(snapshotRootDir, to);
|
||||
boolean toProcess = Arrays.equals(startPath, DFSUtilClient.EMPTY_BYTES);
|
||||
byte[][] resumePath = DFSUtilClient.bytes2byteArray(startPath);
|
||||
if (from.equals(to)) {
|
||||
return null;
|
||||
}
|
||||
SnapshotDiffListingInfo diffs =
|
||||
new SnapshotDiffListingInfo(snapshotRootDir, snapshotDiffScopeDir,
|
||||
fromSnapshot, toSnapshot, snapshotDiffReportEntriesLimit);
|
||||
diffs.setLastIndex(index);
|
||||
computeDiffRecursively(snapshotDiffScopeDir, snapshotDiffScopeDir,
|
||||
new ArrayList<byte[]>(), diffs, resumePath, 0, toProcess);
|
||||
return diffs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the snapshot matching the given name.
|
||||
*
|
||||
@ -367,12 +417,96 @@ private void computeDiffRecursively(final INodeDirectory snapshotDir,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively compute the difference between snapshots under a given
|
||||
* directory/file partially.
|
||||
* @param snapshotDir The directory where snapshots were taken. Can be a
|
||||
* snapshot root directory or any descendant directory
|
||||
* under snapshot root directory.
|
||||
* @param node The directory/file under which the diff is computed.
|
||||
* @param parentPath Relative path (corresponding to the snapshot root) of
|
||||
* the node's parent.
|
||||
* @param diffReport data structure used to store the diff.
|
||||
* @param resume path from where to resume the snapshotdiff computation
|
||||
* in one rpc call
|
||||
* @param level indicates the level of the directory tree rooted at
|
||||
* snapshotRoot.
|
||||
* @param processFlag indicates that the dir/file where the snapshotdiff
|
||||
* computation has to start is processed or not.
|
||||
*/
|
||||
private boolean computeDiffRecursively(final INodeDirectory snapshotDir,
|
||||
INode node, List<byte[]> parentPath, SnapshotDiffListingInfo diffReport,
|
||||
final byte[][] resume, int level, boolean processFlag) {
|
||||
final Snapshot earlier = diffReport.getEarlier();
|
||||
final Snapshot later = diffReport.getLater();
|
||||
byte[][] relativePath = parentPath.toArray(new byte[parentPath.size()][]);
|
||||
if (!processFlag && level == resume.length
|
||||
&& Arrays.equals(resume[resume.length - 1], node.getLocalNameBytes())) {
|
||||
processFlag = true;
|
||||
}
|
||||
|
||||
if (node.isDirectory()) {
|
||||
final ChildrenDiff diff = new ChildrenDiff();
|
||||
INodeDirectory dir = node.asDirectory();
|
||||
if (processFlag) {
|
||||
DirectoryWithSnapshotFeature sf = dir.getDirectoryWithSnapshotFeature();
|
||||
if (sf != null) {
|
||||
boolean change =
|
||||
sf.computeDiffBetweenSnapshots(earlier, later, diff, dir);
|
||||
if (change) {
|
||||
if (!diffReport.addDirDiff(dir.getId(), relativePath, diff)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ReadOnlyList<INode> children = dir.getChildrenList(earlier.getId());
|
||||
boolean iterate = false;
|
||||
for (INode child : children) {
|
||||
final byte[] name = child.getLocalNameBytes();
|
||||
if (!processFlag && !iterate && !Arrays.equals(resume[level], name)) {
|
||||
continue;
|
||||
}
|
||||
iterate = true;
|
||||
level = level + 1;
|
||||
boolean toProcess = diff.searchIndex(ListType.DELETED, name) < 0;
|
||||
if (!toProcess && child instanceof INodeReference.WithName) {
|
||||
byte[][] renameTargetPath = findRenameTargetPath(snapshotDir,
|
||||
(WithName) child, Snapshot.getSnapshotId(later));
|
||||
if (renameTargetPath != null) {
|
||||
toProcess = true;
|
||||
}
|
||||
}
|
||||
if (toProcess) {
|
||||
parentPath.add(name);
|
||||
processFlag = computeDiffRecursively(snapshotDir, child, parentPath,
|
||||
diffReport, resume, level, processFlag);
|
||||
parentPath.remove(parentPath.size() - 1);
|
||||
if (!processFlag) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (node.isFile() && node.asFile().isWithSnapshot() && processFlag) {
|
||||
INodeFile file = node.asFile();
|
||||
boolean change = file.getFileWithSnapshotFeature()
|
||||
.changedBetweenSnapshots(file, earlier, later);
|
||||
if (change) {
|
||||
if (!diffReport.addFileDiff(file, relativePath)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* We just found a deleted WithName node as the source of a rename operation.
|
||||
* However, we should include it in our snapshot diff report as rename only
|
||||
* if the rename target is also under the same snapshottable directory.
|
||||
*/
|
||||
private byte[][] findRenameTargetPath(final INodeDirectory snapshotRoot,
|
||||
public byte[][] findRenameTargetPath(final INodeDirectory snapshotRoot,
|
||||
INodeReference.WithName wn, final int snapshotId) {
|
||||
INode inode = wn.getReferredINode();
|
||||
final LinkedList<byte[]> ancestors = Lists.newLinkedList();
|
||||
|
@ -0,0 +1,207 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing.DiffReportListingEntry;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeReference;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.ChildrenDiff;
|
||||
import org.apache.hadoop.hdfs.util.Diff.ListType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.util.ChunkedArrayList;
|
||||
|
||||
/**
|
||||
* A class describing the difference between snapshots of a snapshottable
|
||||
* directory where the difference is limited by dfs.snapshotDiff-report.limit.
|
||||
*/
|
||||
|
||||
class SnapshotDiffListingInfo {
|
||||
private final int maxEntries;
|
||||
|
||||
/** The root directory of the snapshots. */
|
||||
private final INodeDirectory snapshotRoot;
|
||||
/**
|
||||
* The scope directory under which snapshot diff is calculated.
|
||||
*/
|
||||
private final INodeDirectory snapshotDiffScopeDir;
|
||||
/** The starting point of the difference. */
|
||||
private final Snapshot from;
|
||||
/** The end point of the difference. */
|
||||
private final Snapshot to;
|
||||
|
||||
/** The path of the file to start for computing the snapshot diff. */
|
||||
private byte[] lastPath = DFSUtilClient.EMPTY_BYTES;
|
||||
|
||||
private int lastIndex = -1;
|
||||
|
||||
/*
|
||||
* A list containing all the modified entries between the given snapshots
|
||||
* within a single rpc call.
|
||||
*/
|
||||
private final List<DiffReportListingEntry> modifiedList =
|
||||
new ChunkedArrayList<>();
|
||||
|
||||
private final List<DiffReportListingEntry> createdList =
|
||||
new ChunkedArrayList<>();
|
||||
|
||||
private final List<DiffReportListingEntry> deletedList =
|
||||
new ChunkedArrayList<>();
|
||||
|
||||
SnapshotDiffListingInfo(INodeDirectory snapshotRootDir,
|
||||
INodeDirectory snapshotDiffScopeDir, Snapshot start, Snapshot end,
|
||||
int snapshotDiffReportLimit) {
|
||||
Preconditions.checkArgument(
|
||||
snapshotRootDir.isSnapshottable() && snapshotDiffScopeDir
|
||||
.isDescendantOfSnapshotRoot(snapshotRootDir));
|
||||
this.snapshotRoot = snapshotRootDir;
|
||||
this.snapshotDiffScopeDir = snapshotDiffScopeDir;
|
||||
this.from = start;
|
||||
this.to = end;
|
||||
this.maxEntries = snapshotDiffReportLimit;
|
||||
}
|
||||
|
||||
boolean addDirDiff(long dirId, byte[][] parent, ChildrenDiff diff) {
|
||||
final Snapshot laterSnapshot = getLater();
|
||||
if (lastIndex == -1) {
|
||||
if (getTotalEntries() < maxEntries) {
|
||||
modifiedList.add(new DiffReportListingEntry(
|
||||
dirId, dirId, parent, true, null));
|
||||
} else {
|
||||
setLastPath(parent);
|
||||
setLastIndex(-1);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (lastIndex == -1 || lastIndex < diff.getList(ListType.CREATED).size()) {
|
||||
ListIterator<INode> iterator = lastIndex != -1 ?
|
||||
diff.getList(ListType.CREATED).listIterator(lastIndex)
|
||||
: diff.getList(ListType.CREATED).listIterator();
|
||||
while (iterator.hasNext()) {
|
||||
if (getTotalEntries() < maxEntries) {
|
||||
INode created = iterator.next();
|
||||
byte[][] path = newPath(parent, created.getLocalNameBytes());
|
||||
createdList.add(new DiffReportListingEntry(dirId, created.getId(),
|
||||
path, created.isReference(), null));
|
||||
} else {
|
||||
setLastPath(parent);
|
||||
setLastIndex(iterator.nextIndex());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
setLastIndex(-1);
|
||||
}
|
||||
|
||||
if (lastIndex == -1 || lastIndex >= diff.getList(ListType.CREATED).size()) {
|
||||
int size = diff.getList(ListType.DELETED).size();
|
||||
ListIterator<INode> iterator = lastIndex != -1 ?
|
||||
diff.getList(ListType.DELETED).listIterator(lastIndex - size)
|
||||
: diff.getList(ListType.DELETED).listIterator();
|
||||
while (iterator.hasNext()) {
|
||||
if (getTotalEntries() < maxEntries) {
|
||||
final INode d = iterator.next();
|
||||
byte[][] path = newPath(parent, d.getLocalNameBytes());
|
||||
byte[][] target = findRenameTargetPath(d, laterSnapshot);
|
||||
final DiffReportListingEntry e = target != null ?
|
||||
new DiffReportListingEntry(dirId, d.getId(), path, true, target) :
|
||||
new DiffReportListingEntry(dirId, d.getId(), path, false, null);
|
||||
deletedList.add(e);
|
||||
} else {
|
||||
setLastPath(parent);
|
||||
setLastIndex(size + iterator.nextIndex());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
setLastIndex(-1);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private byte[][] findRenameTargetPath(INode deleted, Snapshot laterSnapshot) {
|
||||
if (deleted instanceof INodeReference.WithName) {
|
||||
return snapshotRoot.getDirectorySnapshottableFeature()
|
||||
.findRenameTargetPath(snapshotDiffScopeDir,
|
||||
(INodeReference.WithName) deleted,
|
||||
Snapshot.getSnapshotId(laterSnapshot));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static byte[][] newPath(byte[][] parent, byte[] name) {
|
||||
byte[][] fullPath = new byte[parent.length + 1][];
|
||||
System.arraycopy(parent, 0, fullPath, 0, parent.length);
|
||||
fullPath[fullPath.length - 1] = name;
|
||||
return fullPath;
|
||||
}
|
||||
|
||||
Snapshot getEarlier() {
|
||||
return isFromEarlier()? from: to;
|
||||
}
|
||||
|
||||
Snapshot getLater() {
|
||||
return isFromEarlier()? to: from;
|
||||
}
|
||||
|
||||
|
||||
public void setLastPath(byte[][] lastPath) {
|
||||
this.lastPath = DFSUtilClient.byteArray2bytes(lastPath);
|
||||
}
|
||||
|
||||
public void setLastIndex(int idx) {
|
||||
this.lastIndex = idx;
|
||||
}
|
||||
|
||||
boolean addFileDiff(INodeFile file, byte[][] relativePath) {
|
||||
if (getTotalEntries() < maxEntries) {
|
||||
modifiedList.add(new DiffReportListingEntry(file.getId(),
|
||||
file.getId(), relativePath,false, null));
|
||||
} else {
|
||||
setLastPath(relativePath);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
/** @return True if {@link #from} is earlier than {@link #to} */
|
||||
boolean isFromEarlier() {
|
||||
return Snapshot.ID_COMPARATOR.compare(from, to) < 0;
|
||||
}
|
||||
|
||||
|
||||
private int getTotalEntries() {
|
||||
return createdList.size() + modifiedList.size() + deletedList.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a {@link SnapshotDiffReportListing} based on detailed diff
|
||||
* information.
|
||||
*
|
||||
* @return A {@link SnapshotDiffReportListing} describing the difference
|
||||
*/
|
||||
public SnapshotDiffReportListing generateReport() {
|
||||
return new SnapshotDiffReportListing(lastPath, modifiedList, createdList,
|
||||
deletedList, lastIndex, isFromEarlier());
|
||||
}
|
||||
}
|
@ -44,6 +44,7 @@
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
@ -467,6 +468,33 @@ public SnapshotDiffReport diff(final INodesInPath iip,
|
||||
snapshotPath, from, to, Collections.<DiffReportEntry> emptyList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the partial difference between two snapshots of a directory,
|
||||
* or between a snapshot of the directory and its current tree.
|
||||
*/
|
||||
public SnapshotDiffReportListing diff(final INodesInPath iip,
|
||||
final String snapshotPath, final String from, final String to,
|
||||
byte[] startPath, int index, int snapshotDiffReportLimit)
|
||||
throws IOException {
|
||||
// Find the source root directory path where the snapshots were taken.
|
||||
// All the check for path has been included in the valueOf method.
|
||||
INodeDirectory snapshotRootDir;
|
||||
if (this.snapshotDiffAllowSnapRootDescendant) {
|
||||
snapshotRootDir = getSnapshottableAncestorDir(iip);
|
||||
} else {
|
||||
snapshotRootDir = getSnapshottableRoot(iip);
|
||||
}
|
||||
Preconditions.checkNotNull(snapshotRootDir);
|
||||
INodeDirectory snapshotDescendantDir = INodeDirectory.valueOf(
|
||||
iip.getLastINode(), snapshotPath);
|
||||
final SnapshotDiffListingInfo diffs =
|
||||
snapshotRootDir.getDirectorySnapshottableFeature()
|
||||
.computeDiff(snapshotRootDir, snapshotDescendantDir, from, to,
|
||||
startPath, index, snapshotDiffReportLimit);
|
||||
return diffs != null ? diffs.generateReport() :
|
||||
new SnapshotDiffReportListing();
|
||||
}
|
||||
|
||||
public void clearSnapshottableDirs() {
|
||||
snapshottables.clear();
|
||||
}
|
||||
|
@ -4332,6 +4332,17 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.snapshotdiff.listing.limit</name>
|
||||
<value>1000</value>
|
||||
<description>
|
||||
Limit the number of entries generated by getSnapshotDiffReportListing within
|
||||
one rpc call to the namenode.If less or equal to zero, at most
|
||||
DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT (= 1000) will be sent
|
||||
across to the client within one rpc call.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.pipeline.ecn</name>
|
||||
<value>false</value>
|
||||
|
@ -90,6 +90,7 @@ public void setUp() throws Exception {
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_ALLOW_SNAP_ROOT_DESCENDANT,
|
||||
true);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT, 3);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
|
||||
.format(true).build();
|
||||
cluster.waitActive();
|
||||
@ -1293,4 +1294,119 @@ public void testDontCaptureAccessTimeOnlyChangeReport() throws Exception {
|
||||
|
||||
assertAtimeNotEquals(filePostSS, root, "s2", "s3");
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests to verfy the diff report with maximum SnapsdiffReportEntries limit
|
||||
* over an rpc being set to 3.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testDiffReportWithRpcLimit() throws Exception {
|
||||
final Path root = new Path("/");
|
||||
hdfs.mkdirs(root);
|
||||
for (int i = 1; i < 4; i++) {
|
||||
final Path path = new Path(root, "dir" + i);
|
||||
hdfs.mkdirs(path);
|
||||
}
|
||||
SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
|
||||
for (int i = 1; i < 4; i++) {
|
||||
final Path path = new Path(root, "dir" + i);
|
||||
for (int j = 1; j < 4; j++) {
|
||||
final Path file = new Path(path, "file" + j);
|
||||
DFSTestUtil.createFile(hdfs, file, BLOCKSIZE, REPLICATION, SEED);
|
||||
}
|
||||
}
|
||||
|
||||
SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
|
||||
verifyDiffReport(root, "s0", "s1",
|
||||
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
|
||||
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir1")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir1/file1")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir1/file2")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir1/file3")),
|
||||
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir2")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir2/file1")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir2/file2")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir2/file3")),
|
||||
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir3")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir3/file1")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir3/file2")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir3/file3")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiffReportWithRpcLimit2() throws Exception {
|
||||
final Path root = new Path("/");
|
||||
hdfs.mkdirs(root);
|
||||
for (int i = 1; i <=3; i++) {
|
||||
final Path path = new Path(root, "dir" + i);
|
||||
hdfs.mkdirs(path);
|
||||
}
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
final Path path = new Path(root, "dir" + i);
|
||||
for (int j = 1; j < 4; j++) {
|
||||
final Path file = new Path(path, "file" + j);
|
||||
DFSTestUtil.createFile(hdfs, file, BLOCKSIZE, REPLICATION, SEED);
|
||||
}
|
||||
}
|
||||
SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
|
||||
Path targetDir = new Path(root, "dir4");
|
||||
//create directory dir4
|
||||
hdfs.mkdirs(targetDir);
|
||||
//moves files from dir1 to dir4
|
||||
Path path = new Path(root, "dir1");
|
||||
for (int j = 1; j < 4; j++) {
|
||||
final Path srcPath = new Path(path, "file" + j);
|
||||
final Path targetPath = new Path(targetDir, "file" + j);
|
||||
hdfs.rename(srcPath, targetPath);
|
||||
}
|
||||
targetDir = new Path(root, "dir3");
|
||||
//overwrite existing files in dir3 from files in dir1
|
||||
path = new Path(root, "dir2");
|
||||
for (int j = 1; j < 4; j++) {
|
||||
final Path srcPath = new Path(path, "file" + j);
|
||||
final Path targetPath = new Path(targetDir, "file" + j);
|
||||
hdfs.rename(srcPath, targetPath, Rename.OVERWRITE);
|
||||
}
|
||||
final Path pathToRename = new Path(root, "dir2");
|
||||
//move dir2 inside dir3
|
||||
hdfs.rename(pathToRename, targetDir);
|
||||
SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
|
||||
verifyDiffReport(root, "s0", "s1",
|
||||
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
|
||||
new DiffReportEntry(DiffType.CREATE,
|
||||
DFSUtil.string2Bytes("dir4")),
|
||||
new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("dir2"),
|
||||
DFSUtil.string2Bytes("dir3/dir2")),
|
||||
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir1")),
|
||||
new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("dir1/file1"),
|
||||
DFSUtil.string2Bytes("dir4/file1")),
|
||||
new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("dir1/file2"),
|
||||
DFSUtil.string2Bytes("dir4/file2")),
|
||||
new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("dir1/file3"),
|
||||
DFSUtil.string2Bytes("dir4/file3")),
|
||||
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir2")),
|
||||
new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("dir2/file1"),
|
||||
DFSUtil.string2Bytes("dir3/file1")),
|
||||
new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("dir2/file2"),
|
||||
DFSUtil.string2Bytes("dir3/file2")),
|
||||
new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("dir2/file3"),
|
||||
DFSUtil.string2Bytes("dir3/file3")),
|
||||
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir3")),
|
||||
new DiffReportEntry(DiffType.DELETE,
|
||||
DFSUtil.string2Bytes("dir3/file1")),
|
||||
new DiffReportEntry(DiffType.DELETE,
|
||||
DFSUtil.string2Bytes("dir3/file1")),
|
||||
new DiffReportEntry(DiffType.DELETE,
|
||||
DFSUtil.string2Bytes("dir3/file3")));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user