[HDFS-12386] Add fsserver defaults call to WebhdfsFileSystem. (Rushabh Shah via daryn)
This commit is contained in:
parent
9df05005ac
commit
0da29cbeea
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.fs.ContentSummary.Builder;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
|
||||
@ -65,6 +66,8 @@
|
||||
|
||||
class JsonUtilClient {
|
||||
static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
|
||||
static final String UNSUPPPORTED_EXCEPTION_STR =
|
||||
UnsupportedOperationException.class.getName();
|
||||
|
||||
/** Convert a Json map to a RemoteException. */
|
||||
static RemoteException toRemoteException(final Map<?, ?> json) {
|
||||
@ -72,6 +75,9 @@ static RemoteException toRemoteException(final Map<?, ?> json) {
|
||||
RemoteException.class.getSimpleName());
|
||||
final String message = (String)m.get("message");
|
||||
final String javaClassName = (String)m.get("javaClassName");
|
||||
if (UNSUPPPORTED_EXCEPTION_STR.equals(javaClassName)) {
|
||||
throw new UnsupportedOperationException(message);
|
||||
}
|
||||
return new RemoteException(javaClassName, message);
|
||||
}
|
||||
|
||||
@ -644,4 +650,36 @@ private static StorageType[] toStorageTypes(List<?> list) {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* The parameters which have default value -1 are required fields according
|
||||
* to hdfs.proto.
|
||||
* The default values for optional fields are taken from
|
||||
* hdfs.proto#FsServerDefaultsProto.
|
||||
*/
|
||||
public static FsServerDefaults toFsServerDefaults(final Map<?, ?> json) {
|
||||
if (json == null) {
|
||||
return null;
|
||||
}
|
||||
Map<?, ?> m =
|
||||
(Map<?, ?>) json.get(FsServerDefaults.class.getSimpleName());
|
||||
long blockSize = getLong(m, "blockSize", -1);
|
||||
int bytesPerChecksum = getInt(m, "bytesPerChecksum", -1);
|
||||
int writePacketSize = getInt(m, "writePacketSize", -1);
|
||||
short replication = (short) getInt(m, "replication", -1);
|
||||
int fileBufferSize = getInt(m, "fileBufferSize", -1);
|
||||
boolean encryptDataTransfer = m.containsKey("encryptDataTransfer")
|
||||
? (Boolean) m.get("encryptDataTransfer")
|
||||
: false;
|
||||
long trashInterval = getLong(m, "trashInterval", 0);
|
||||
DataChecksum.Type type =
|
||||
DataChecksum.Type.valueOf(getInt(m, "checksumType", 1));
|
||||
String keyProviderUri = (String) m.get("keyProviderUri");
|
||||
byte storagepolicyId = m.containsKey("defaultStoragePolicyId")
|
||||
? ((Number) m.get("defaultStoragePolicyId")).byteValue()
|
||||
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||
return new FsServerDefaults(blockSize, bytesPerChecksum,
|
||||
writePacketSize, replication, fileBufferSize,
|
||||
encryptDataTransfer, trashInterval, type, keyProviderUri,
|
||||
storagepolicyId);
|
||||
}
|
||||
}
|
||||
|
@ -65,6 +65,7 @@
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
|
||||
import org.apache.hadoop.fs.StorageStatistics;
|
||||
@ -1766,6 +1767,22 @@ public void unsetStoragePolicy(Path src) throws IOException {
|
||||
new FsPathRunner(op, src).run();
|
||||
}
|
||||
|
||||
/*
|
||||
* Caller of this method should handle UnsupportedOperationException in case
|
||||
* when new client is talking to old namenode that don't support
|
||||
* FsServerDefaults call.
|
||||
*/
|
||||
@Override
|
||||
public FsServerDefaults getServerDefaults() throws IOException {
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETSERVERDEFAULTS;
|
||||
return new FsPathResponseRunner<FsServerDefaults>(op, null) {
|
||||
@Override
|
||||
FsServerDefaults decodeResponse(Map<?, ?> json) throws IOException {
|
||||
return JsonUtilClient.toFsServerDefaults(json);
|
||||
}
|
||||
}.run();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
InetSocketAddress[] getResolvedNNAddr() {
|
||||
return nnAddrs;
|
||||
|
@ -46,7 +46,8 @@ public enum Op implements HttpOpParam.Op {
|
||||
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED),
|
||||
|
||||
CHECKACCESS(false, HttpURLConnection.HTTP_OK),
|
||||
LISTSTATUS_BATCH(false, HttpURLConnection.HTTP_OK);
|
||||
LISTSTATUS_BATCH(false, HttpURLConnection.HTTP_OK),
|
||||
GETSERVERDEFAULTS(false, HttpURLConnection.HTTP_OK);
|
||||
|
||||
final boolean redirect;
|
||||
final int expectedHttpResponseCode;
|
||||
|
@ -1782,7 +1782,8 @@ private String metaSaveAsString() {
|
||||
return sw.toString();
|
||||
}
|
||||
|
||||
FsServerDefaults getServerDefaults() throws StandbyException {
|
||||
@VisibleForTesting
|
||||
public FsServerDefaults getServerDefaults() throws StandbyException {
|
||||
checkOperation(OperationCategory.READ);
|
||||
return serverDefaults;
|
||||
}
|
||||
|
@ -57,6 +57,7 @@
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
@ -115,6 +116,7 @@ public class NamenodeWebHdfsMethods {
|
||||
private Principal userPrincipal;
|
||||
private String remoteAddr;
|
||||
|
||||
private static volatile String serverDefaultsResponse = null;
|
||||
private @Context ServletContext context;
|
||||
private @Context HttpServletResponse response;
|
||||
|
||||
@ -1121,11 +1123,30 @@ private Response get(
|
||||
final String js = JsonUtil.toJsonString(storagePolicy);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETSERVERDEFAULTS: {
|
||||
// Since none of the server defaults values are hot reloaded, we can
|
||||
// cache the output of serverDefaults.
|
||||
if (serverDefaultsResponse == null) {
|
||||
FsServerDefaults serverDefaults = np.getServerDefaults();
|
||||
serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults);
|
||||
}
|
||||
return Response.ok(serverDefaultsResponse)
|
||||
.type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This is used only and only for testing.
|
||||
* Please don't use it otherwise.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static void resetServerDefaultsResponse() {
|
||||
serverDefaultsResponse = null;
|
||||
}
|
||||
|
||||
private static String getTrashRoot(String fullPath,
|
||||
Configuration conf) throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf != null ? conf : new Configuration());
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
@ -470,4 +471,23 @@ private static Object toJsonMap(BlockStoragePolicy blockStoragePolicy) {
|
||||
public static String toJsonString(BlockStoragePolicy storagePolicy) {
|
||||
return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy));
|
||||
}
|
||||
|
||||
public static String toJsonString(FsServerDefaults serverDefaults) {
|
||||
return toJsonString(FsServerDefaults.class, toJsonMap(serverDefaults));
|
||||
}
|
||||
|
||||
private static Object toJsonMap(FsServerDefaults serverDefaults) {
|
||||
final Map<String, Object> m = new HashMap<String, Object>();
|
||||
m.put("blockSize", serverDefaults.getBlockSize());
|
||||
m.put("bytesPerChecksum", serverDefaults.getBytesPerChecksum());
|
||||
m.put("writePacketSize", serverDefaults.getWritePacketSize());
|
||||
m.put("replication", serverDefaults.getReplication());
|
||||
m.put("fileBufferSize", serverDefaults.getFileBufferSize());
|
||||
m.put("encryptDataTransfer", serverDefaults.getEncryptDataTransfer());
|
||||
m.put("trashInterval", serverDefaults.getTrashInterval());
|
||||
m.put("checksumType", serverDefaults.getChecksumType().id);
|
||||
m.put("keyProviderUri", serverDefaults.getKeyProviderUri());
|
||||
m.put("defaultStoragePolicyId", serverDefaults.getDefaultStoragePolicyId());
|
||||
return m;
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,14 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
@ -53,6 +61,7 @@
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
@ -74,7 +83,9 @@
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
@ -92,11 +103,13 @@
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.log4j.Level;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
@ -1318,4 +1331,125 @@ public void testWebHdfsAppend() throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test fsserver defaults response from {@link DistributedFileSystem} and
|
||||
* {@link WebHdfsFileSystem} are the same.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testFsserverDefaults() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||
// Here we override all the default values so that we can verify that it
|
||||
// doesn't pick up the default value.
|
||||
long blockSize = 256*1024*1024;
|
||||
int bytesPerChecksum = 256;
|
||||
int writePacketSize = 128*1024;
|
||||
int replicationFactor = 0;
|
||||
int bufferSize = 1024;
|
||||
boolean encryptDataTransfer = true;
|
||||
long trashInterval = 1;
|
||||
String checksumType = "CRC32";
|
||||
// Setting policy to a special value 7 because BlockManager will
|
||||
// create defaultSuite with policy id 7.
|
||||
byte policyId = (byte) 7;
|
||||
|
||||
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
conf.setInt(DFS_BYTES_PER_CHECKSUM_KEY, bytesPerChecksum);
|
||||
conf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, writePacketSize);
|
||||
conf.setInt(DFS_REPLICATION_KEY, replicationFactor);
|
||||
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, bufferSize);
|
||||
conf.setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, encryptDataTransfer);
|
||||
conf.setLong(FS_TRASH_INTERVAL_KEY, trashInterval);
|
||||
conf.set(DFS_CHECKSUM_TYPE_KEY, checksumType);
|
||||
FsServerDefaults originalServerDefaults = new FsServerDefaults(blockSize,
|
||||
bytesPerChecksum, writePacketSize, (short)replicationFactor,
|
||||
bufferSize, encryptDataTransfer, trashInterval,
|
||||
DataChecksum.Type.valueOf(checksumType), "", policyId);
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(
|
||||
conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
FsServerDefaults dfsServerDefaults = dfs.getServerDefaults();
|
||||
FsServerDefaults webfsServerDefaults = webfs.getServerDefaults();
|
||||
// Verify whether server defaults value that we override is equal to
|
||||
// dfsServerDefaults.
|
||||
compareFsServerDefaults(originalServerDefaults, dfsServerDefaults);
|
||||
// Verify whether dfs serverdefaults is equal to
|
||||
// webhdfsServerDefaults.
|
||||
compareFsServerDefaults(dfsServerDefaults, webfsServerDefaults);
|
||||
webfs.getServerDefaults();
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void compareFsServerDefaults(FsServerDefaults serverDefaults1,
|
||||
FsServerDefaults serverDefaults2) throws Exception {
|
||||
Assert.assertEquals("Block size is different",
|
||||
serverDefaults1.getBlockSize(),
|
||||
serverDefaults2.getBlockSize());
|
||||
Assert.assertEquals("Bytes per checksum are different",
|
||||
serverDefaults1.getBytesPerChecksum(),
|
||||
serverDefaults2.getBytesPerChecksum());
|
||||
Assert.assertEquals("Write packet size is different",
|
||||
serverDefaults1.getWritePacketSize(),
|
||||
serverDefaults2.getWritePacketSize());
|
||||
Assert.assertEquals("Default replication is different",
|
||||
serverDefaults1.getReplication(),
|
||||
serverDefaults2.getReplication());
|
||||
Assert.assertEquals("File buffer size are different",
|
||||
serverDefaults1.getFileBufferSize(),
|
||||
serverDefaults2.getFileBufferSize());
|
||||
Assert.assertEquals("Encrypt data transfer key is different",
|
||||
serverDefaults1.getEncryptDataTransfer(),
|
||||
serverDefaults2.getEncryptDataTransfer());
|
||||
Assert.assertEquals("Trash interval is different",
|
||||
serverDefaults1.getTrashInterval(),
|
||||
serverDefaults2.getTrashInterval());
|
||||
Assert.assertEquals("Checksum type is different",
|
||||
serverDefaults1.getChecksumType(),
|
||||
serverDefaults2.getChecksumType());
|
||||
Assert.assertEquals("Key provider uri is different",
|
||||
serverDefaults1.getKeyProviderUri(),
|
||||
serverDefaults2.getKeyProviderUri());
|
||||
Assert.assertEquals("Default storage policy is different",
|
||||
serverDefaults1.getDefaultStoragePolicyId(),
|
||||
serverDefaults2.getDefaultStoragePolicyId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the case when client is upgraded to return {@link FsServerDefaults}
|
||||
* but then namenode is not upgraded.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testFsserverDefaultsBackwardsCompatible() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(
|
||||
conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
NamenodeWebHdfsMethods.resetServerDefaultsResponse();
|
||||
FSNamesystem fsnSpy =
|
||||
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
|
||||
Mockito.when(fsnSpy.getServerDefaults()).
|
||||
thenThrow(new UnsupportedOperationException());
|
||||
try {
|
||||
webfs.getServerDefaults();
|
||||
Assert.fail("should have thrown UnSupportedOperationException.");
|
||||
} catch (UnsupportedOperationException uoe) {
|
||||
//Expected exception.
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user