diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 018082851d..ad80bc255a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -121,9 +121,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmI import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto; @@ -584,6 +586,55 @@ public class PBHelperClient { return blockTokens; } + public static AccessModeProto convert(BlockTokenIdentifier.AccessMode aMode) { + switch (aMode) { + case READ: return AccessModeProto.READ; + case WRITE: return AccessModeProto.WRITE; + case COPY: return AccessModeProto.COPY; + case REPLACE: return AccessModeProto.REPLACE; + default: + throw new IllegalArgumentException("Unexpected AccessMode: " + aMode); + } + } + + public static BlockTokenIdentifier.AccessMode convert( + AccessModeProto accessModeProto) { + switch (accessModeProto) { + case READ: return BlockTokenIdentifier.AccessMode.READ; + case WRITE: return BlockTokenIdentifier.AccessMode.WRITE; + case COPY: return BlockTokenIdentifier.AccessMode.COPY; + case REPLACE: return BlockTokenIdentifier.AccessMode.REPLACE; + default: + throw new IllegalArgumentException("Unexpected AccessModeProto: " + + accessModeProto); + } + } + + public static BlockTokenSecretProto convert( + BlockTokenIdentifier blockTokenSecret) { + BlockTokenSecretProto.Builder builder = + BlockTokenSecretProto.newBuilder(); + builder.setExpiryDate(blockTokenSecret.getExpiryDate()); + builder.setKeyId(blockTokenSecret.getKeyId()); + String userId = blockTokenSecret.getUserId(); + if (userId != null) { + builder.setUserId(userId); + } + + String blockPoolId = blockTokenSecret.getBlockPoolId(); + if (blockPoolId != null) { + builder.setBlockPoolId(blockPoolId); + } + + builder.setBlockId(blockTokenSecret.getBlockId()); + + for (BlockTokenIdentifier.AccessMode aMode : + blockTokenSecret.getAccessModes()) { + builder.addModes(convert(aMode)); + } + return builder.build(); + } + static public DatanodeInfo convert(DatanodeInfoProto di) { if (di == null) { return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java index 3f2c9ca98e..28e7acc3b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java @@ -19,11 +19,16 @@ package org.apache.hadoop.hdfs.security.token.block; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.util.EnumSet; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -44,20 +49,22 @@ public class BlockTokenIdentifier extends TokenIdentifier { private String blockPoolId; private long blockId; private final EnumSet modes; + private boolean useProto; private byte [] cache; public BlockTokenIdentifier() { - this(null, null, 0, EnumSet.noneOf(AccessMode.class)); + this(null, null, 0, EnumSet.noneOf(AccessMode.class), false); } public BlockTokenIdentifier(String userId, String bpid, long blockId, - EnumSet modes) { + EnumSet modes, boolean useProto) { this.cache = null; this.userId = userId; this.blockPoolId = bpid; this.blockId = blockId; this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes; + this.useProto = useProto; } @Override @@ -144,9 +151,45 @@ public class BlockTokenIdentifier extends TokenIdentifier { ^ (blockPoolId == null ? 0 : blockPoolId.hashCode()); } + /** + * readFields peeks at the first byte of the DataInput and determines if it + * was written using WritableUtils ("Legacy") or Protobuf. We can do this + * because we know the first field is the Expiry date. + * + * In the case of the legacy buffer, the expiry date is a VInt, so the size + * (which should always be >1) is encoded in the first byte - which is + * always negative due to this encoding. However, there are sometimes null + * BlockTokenIdentifier written so we also need to handle the case there + * the first byte is also 0. + * + * In the case of protobuf, the first byte is a type tag for the expiry date + * which is written as (field_number << 3 | wire_type. + * So as long as the field_number is less than 16, but also positive, then + * we know we have a Protobuf. + * + * @param in DataInput to deserialize this object from. + * @throws IOException + */ @Override public void readFields(DataInput in) throws IOException { this.cache = null; + + final DataInputStream dis = (DataInputStream)in; + if (!dis.markSupported()) { + throw new IOException("Could not peek first byte."); + } + dis.mark(1); + final byte firstByte = dis.readByte(); + dis.reset(); + if (firstByte <= 0) { + readFieldsLegacy(dis); + } else { + readFieldsProtobuf(dis); + } + } + + @VisibleForTesting + void readFieldsLegacy(DataInput in) throws IOException { expiryDate = WritableUtils.readVLong(in); keyId = WritableUtils.readVInt(in); userId = WritableUtils.readString(in); @@ -157,10 +200,44 @@ public class BlockTokenIdentifier extends TokenIdentifier { for (int i = 0; i < length; i++) { modes.add(WritableUtils.readEnum(in, AccessMode.class)); } + useProto = false; + } + + @VisibleForTesting + void readFieldsProtobuf(DataInput in) throws IOException { + BlockTokenSecretProto blockTokenSecretProto = + BlockTokenSecretProto.parseFrom((DataInputStream)in); + expiryDate = blockTokenSecretProto.getExpiryDate(); + keyId = blockTokenSecretProto.getKeyId(); + if (blockTokenSecretProto.hasUserId()) { + userId = blockTokenSecretProto.getUserId(); + } else { + userId = null; + } + if (blockTokenSecretProto.hasBlockPoolId()) { + blockPoolId = blockTokenSecretProto.getBlockPoolId(); + } else { + blockPoolId = null; + } + blockId = blockTokenSecretProto.getBlockId(); + for (int i = 0; i < blockTokenSecretProto.getModesCount(); i++) { + AccessModeProto accessModeProto = blockTokenSecretProto.getModes(i); + modes.add(PBHelperClient.convert(accessModeProto)); + } + useProto = true; } @Override public void write(DataOutput out) throws IOException { + if (useProto) { + writeProtobuf(out); + } else { + writeLegacy(out); + } + } + + @VisibleForTesting + void writeLegacy(DataOutput out) throws IOException { WritableUtils.writeVLong(out, expiryDate); WritableUtils.writeVInt(out, keyId); WritableUtils.writeString(out, userId); @@ -172,6 +249,12 @@ public class BlockTokenIdentifier extends TokenIdentifier { } } + @VisibleForTesting + void writeProtobuf(DataOutput out) throws IOException { + BlockTokenSecretProto secret = PBHelperClient.convert(this); + out.write(secret.toByteArray()); + } + @Override public byte[] getBytes() { if(cache == null) cache = super.getBytes(); @@ -186,4 +269,4 @@ public class BlockTokenIdentifier extends TokenIdentifier { return KIND_NAME; } } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 1414120489..8a039d482f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -514,3 +514,36 @@ message RollingUpgradeStatusProto { message StorageUuidsProto { repeated string storageUuids = 1; } + +/** + * File access permissions mode. + */ +enum AccessModeProto { + READ = 1; + WRITE = 2; + COPY = 3; + REPLACE = 4; +} + +/** + * Secret information for the BlockKeyProto. This is not sent on the wire as + * such but is used to pack a byte array and encrypted and put in + * BlockKeyProto.bytes + * When adding further fields, make sure they are optional as they would + * otherwise not be backwards compatible. + * + * Note: As part of the migration from WritableUtils based tokens (aka "legacy") + * to Protocol Buffers, we use the first byte to determine the type. If the + * first byte is <=0 then it is a legacy token. This means that when using + * protobuf tokens, the the first field sent must have a `field_number` less + * than 16 to make sure that the first byte is positive. Otherwise it could be + * parsed as a legacy token. See HDFS-11026 for more discussion. + */ +message BlockTokenSecretProto { + optional uint64 expiryDate = 1; + optional uint32 keyId = 2; + optional string userId = 3; + optional string blockPoolId = 4; + optional uint64 blockId = 5; + repeated AccessModeProto modes = 6; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 10a521bf39..cf1d21a380 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -641,6 +641,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT = 600L; public static final String DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime"; public static final long DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L; + public static final String DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE = "dfs.block.access.token.protobuf.enable"; + public static final boolean DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT = false; public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname"; public static final Class DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index ba08740602..a3100d002c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -75,6 +75,7 @@ public class BlockTokenSecretManager extends private final int intRange; private final int nnRangeStart; + private final boolean useProto; private final SecureRandom nonceGenerator = new SecureRandom(); @@ -83,11 +84,13 @@ public class BlockTokenSecretManager extends * * @param keyUpdateInterval how often a new key will be generated * @param tokenLifetime how long an individual token is valid + * @param useProto should we use new protobuf style tokens */ public BlockTokenSecretManager(long keyUpdateInterval, - long tokenLifetime, String blockPoolId, String encryptionAlgorithm) { + long tokenLifetime, String blockPoolId, String encryptionAlgorithm, + boolean useProto) { this(false, keyUpdateInterval, tokenLifetime, blockPoolId, - encryptionAlgorithm, 0, 1); + encryptionAlgorithm, 0, 1, useProto); } /** @@ -102,8 +105,9 @@ public class BlockTokenSecretManager extends */ public BlockTokenSecretManager(long keyUpdateInterval, long tokenLifetime, int nnIndex, int numNNs, String blockPoolId, - String encryptionAlgorithm) { - this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs); + String encryptionAlgorithm, boolean useProto) { + this(true, keyUpdateInterval, tokenLifetime, blockPoolId, + encryptionAlgorithm, nnIndex, numNNs, useProto); Preconditions.checkArgument(nnIndex >= 0); Preconditions.checkArgument(numNNs > 0); setSerialNo(new SecureRandom().nextInt()); @@ -111,7 +115,8 @@ public class BlockTokenSecretManager extends } private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval, - long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) { + long tokenLifetime, String blockPoolId, String encryptionAlgorithm, + int nnIndex, int numNNs, boolean useProto) { this.intRange = Integer.MAX_VALUE / numNNs; this.nnRangeStart = intRange * nnIndex; this.isMaster = isMaster; @@ -120,6 +125,7 @@ public class BlockTokenSecretManager extends this.allKeys = new HashMap(); this.blockPoolId = blockPoolId; this.encryptionAlgorithm = encryptionAlgorithm; + this.useProto = useProto; generateKeys(); } @@ -246,7 +252,7 @@ public class BlockTokenSecretManager extends public Token generateToken(String userId, ExtendedBlock block, EnumSet modes) throws IOException { BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block - .getBlockPoolId(), block.getBlockId(), modes); + .getBlockPoolId(), block.getBlockId(), modes, useProto); return new Token(id, this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java index 1c6b352b7b..0aa6fb26f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java @@ -69,8 +69,12 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory { + ", token lifetime=" + StringUtils.formatTime(tokenLifetime)); String encryptionAlgorithm = conf.get( DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); + final boolean enableProtobuf = conf.getBoolean( + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE, + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT); this.blockTokenSecretManager = new BlockTokenSecretManager( - updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm); + updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm, + enableProtobuf); this.blockTokenSecretManager.addKeys(keys); // sync block keys with NN more frequently than NN updates its block keys diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 893b12d323..5125b334d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -542,6 +542,9 @@ public class BlockManager implements BlockStatsMXBean { String nsId = DFSUtil.getNamenodeNameServiceId(conf); boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId); + boolean shouldWriteProtobufToken = conf.getBoolean( + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE, + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT); if (isHaEnabled) { // figure out which index we are of the nns @@ -555,10 +558,12 @@ public class BlockManager implements BlockStatsMXBean { nnIndex++; } return new BlockTokenSecretManager(updateMin * 60 * 1000L, - lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm); + lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, + encryptionAlgorithm, shouldWriteProtobufToken); } else { return new BlockTokenSecretManager(updateMin*60*1000L, - lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm); + lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm, + shouldWriteProtobufToken); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a6dfa46c22..9ed80efbe3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1534,9 +1534,12 @@ public class DataNode extends ReconfigurableBase + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + " min(s)"); + final boolean enableProtobuf = getConf().getBoolean( + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE, + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT); final BlockTokenSecretManager secretMgr = new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId, - dnConf.encryptionAlgorithm); + dnConf.encryptionAlgorithm, enableProtobuf); blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2bbc78884f..03f1a082f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -502,6 +502,15 @@ The lifetime of access tokens in minutes. + + dfs.block.access.token.protobuf.enable + false + + If "true", block tokens are written using Protocol Buffers. + If "false", block tokens are written using Legacy format. + + + dfs.datanode.data.dir file://${hadoop.tmp.dir}/dfs/data diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index 55e9d307e4..ecb63ae376 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.security.token.block; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -31,7 +32,10 @@ import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Calendar; import java.util.EnumSet; +import java.util.GregorianCalendar; import java.util.Set; import org.apache.commons.logging.Log; @@ -57,6 +61,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.TestWritable; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -104,7 +110,7 @@ public class TestBlockToken { final ExtendedBlock block1 = new ExtendedBlock("0", 0L); final ExtendedBlock block2 = new ExtendedBlock("10", 10L); final ExtendedBlock block3 = new ExtendedBlock("-10", -108L); - + @Before public void disableKerberos() { Configuration conf = new Configuration(); @@ -128,7 +134,7 @@ public class TestBlockToken { InvocationOnMock invocation) throws IOException { Object args[] = invocation.getArguments(); assertEquals(2, args.length); - GetReplicaVisibleLengthRequestProto req = + GetReplicaVisibleLengthRequestProto req = (GetReplicaVisibleLengthRequestProto) args[1]; Set tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); @@ -158,11 +164,11 @@ public class TestBlockToken { return id; } - @Test - public void testWritable() throws Exception { + private void testWritable(boolean enableProtobuf) throws Exception { TestWritable.testWritable(new BlockTokenIdentifier()); BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null, + enableProtobuf); TestWritable.testWritable(generateTokenId(sm, block1, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class))); TestWritable.testWritable(generateTokenId(sm, block2, @@ -171,6 +177,16 @@ public class TestBlockToken { EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class))); } + @Test + public void testWritableLegacy() throws Exception { + testWritable(false); + } + + @Test + public void testWritableProtobuf() throws Exception { + testWritable(true); + } + private void tokenGenerationAndVerification(BlockTokenSecretManager master, BlockTokenSecretManager slave) throws Exception { // single-mode tokens @@ -198,12 +214,14 @@ public class TestBlockToken { } /** test block key and token handling */ - @Test - public void testBlockTokenSecretManager() throws Exception { + private void testBlockTokenSecretManager(boolean enableProtobuf) + throws Exception { BlockTokenSecretManager masterHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null, + enableProtobuf); BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null, + enableProtobuf); ExportedBlockKeys keys = masterHandler.exportKeys(); slaveHandler.addKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler); @@ -215,6 +233,16 @@ public class TestBlockToken { tokenGenerationAndVerification(masterHandler, slaveHandler); } + @Test + public void testBlockTokenSecretManagerLegacy() throws Exception { + testBlockTokenSecretManager(false); + } + + @Test + public void testBlockTokenSecretManagerProtobuf() throws Exception { + testBlockTokenSecretManager(true); + } + private static Server createMockDatanode(BlockTokenSecretManager sm, Token token, Configuration conf) throws IOException, ServiceException { @@ -223,7 +251,7 @@ public class TestBlockToken { BlockTokenIdentifier id = sm.createIdentifier(); id.readFields(new DataInputStream(new ByteArrayInputStream(token .getIdentifier()))); - + doAnswer(new GetLengthAnswer(sm, id)).when(mockDN) .getReplicaVisibleLength(any(RpcController.class), any(GetReplicaVisibleLengthRequestProto.class)); @@ -237,14 +265,14 @@ public class TestBlockToken { .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build(); } - @Test - public void testBlockTokenRpc() throws Exception { + private void testBlockTokenRpc(boolean enableProtobuf) throws Exception { Configuration conf = new Configuration(); conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); - + BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null, + enableProtobuf); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)); @@ -270,20 +298,30 @@ public class TestBlockToken { } } + @Test + public void testBlockTokenRpcLegacy() throws Exception { + testBlockTokenRpc(false); + } + + @Test + public void testBlockTokenRpcProtobuf() throws Exception { + testBlockTokenRpc(true); + } + /** * Test that fast repeated invocations of createClientDatanodeProtocolProxy * will not end up using up thousands of sockets. This is a regression test * for HDFS-1965. */ - @Test - public void testBlockTokenRpcLeak() throws Exception { + private void testBlockTokenRpcLeak(boolean enableProtobuf) throws Exception { Configuration conf = new Configuration(); conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); - + Assume.assumeTrue(FD_DIR.exists()); BlockTokenSecretManager sm = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null, + enableProtobuf); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)); @@ -334,6 +372,16 @@ public class TestBlockToken { RPC.stopProxy(proxyToNoWhere); } + @Test + public void testBlockTokenRpcLeakLegacy() throws Exception { + testBlockTokenRpcLeak(false); + } + + @Test + public void testBlockTokenRpcLeakProtobuf() throws Exception { + testBlockTokenRpcLeak(true); + } + /** * @return the current number of file descriptors open by this process. */ @@ -344,17 +392,19 @@ public class TestBlockToken { /** * Test {@link BlockPoolTokenSecretManager} */ - @Test - public void testBlockPoolTokenSecretManager() throws Exception { + private void testBlockPoolTokenSecretManager(boolean enableProtobuf) + throws Exception { BlockPoolTokenSecretManager bpMgr = new BlockPoolTokenSecretManager(); // Test BlockPoolSecretManager with upto 10 block pools for (int i = 0; i < 10; i++) { String bpid = Integer.toString(i); BlockTokenSecretManager masterHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null, + enableProtobuf); BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null); + blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null, + enableProtobuf); bpMgr.addBlockPool(bpid, slaveHandler); ExportedBlockKeys keys = masterHandler.exportKeys(); @@ -370,20 +420,31 @@ public class TestBlockToken { } } + @Test + public void testBlockPoolTokenSecretManagerLegacy() throws Exception { + testBlockPoolTokenSecretManager(false); + } + + @Test + public void testBlockPoolTokenSecretManagerProtobuf() throws Exception { + testBlockPoolTokenSecretManager(true); + } + /** * This test writes a file and gets the block locations without closing the * file, and tests the block token in the last block. Block token is verified * by ensuring it is of correct kind. - * + * * @throws IOException * @throws InterruptedException */ - @Test - public void testBlockTokenInLastLocatedBlock() throws IOException, - InterruptedException { + private void testBlockTokenInLastLocatedBlock(boolean enableProtobuf) + throws IOException, InterruptedException { Configuration conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE, + enableProtobuf); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); cluster.waitActive(); @@ -411,4 +472,188 @@ public class TestBlockToken { cluster.shutdown(); } } + + @Test + public void testBlockTokenInLastLocatedBlockLegacy() throws IOException, + InterruptedException { + testBlockTokenInLastLocatedBlock(false); + } + + @Test + public void testBlockTokenInLastLocatedBlockProtobuf() throws IOException, + InterruptedException { + testBlockTokenInLastLocatedBlock(true); + } + + @Test + public void testLegacyBlockTokenBytesIsLegacy() throws IOException { + final boolean useProto = false; + BlockTokenSecretManager sm = new BlockTokenSecretManager( + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null, + useProto); + Token token = sm.generateToken(block1, + EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class)); + final byte[] tokenBytes = token.getIdentifier(); + BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); + BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); + BlockTokenIdentifier readToken = new BlockTokenIdentifier(); + + DataInputBuffer dib = new DataInputBuffer(); + + dib.reset(tokenBytes, tokenBytes.length); + legacyToken.readFieldsLegacy(dib); + + boolean invalidProtobufMessage = false; + try { + dib.reset(tokenBytes, tokenBytes.length); + protobufToken.readFieldsProtobuf(dib); + } catch (IOException e) { + invalidProtobufMessage = true; + } + assertTrue(invalidProtobufMessage); + + dib.reset(tokenBytes, tokenBytes.length); + readToken.readFields(dib); + + // Using legacy, the token parses as a legacy block token and not a protobuf + assertEquals(legacyToken, readToken); + assertNotEquals(protobufToken, readToken); + } + + @Test + public void testEmptyLegacyBlockTokenBytesIsLegacy() throws IOException { + BlockTokenIdentifier emptyIdent = new BlockTokenIdentifier(); + DataOutputBuffer dob = new DataOutputBuffer(4096); + DataInputBuffer dib = new DataInputBuffer(); + + emptyIdent.writeLegacy(dob); + byte[] emptyIdentBytes = Arrays.copyOf(dob.getData(), dob.getLength()); + + BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); + BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); + BlockTokenIdentifier readToken = new BlockTokenIdentifier(); + + dib.reset(emptyIdentBytes, emptyIdentBytes.length); + legacyToken.readFieldsLegacy(dib); + + boolean invalidProtobufMessage = false; + try { + dib.reset(emptyIdentBytes, emptyIdentBytes.length); + protobufToken.readFieldsProtobuf(dib); + } catch (IOException e) { + invalidProtobufMessage = true; + } + assertTrue(invalidProtobufMessage); + + dib.reset(emptyIdentBytes, emptyIdentBytes.length); + readToken.readFields(dib); + assertTrue(invalidProtobufMessage); + } + + @Test + public void testProtobufBlockTokenBytesIsProtobuf() throws IOException { + final boolean useProto = true; + BlockTokenSecretManager sm = new BlockTokenSecretManager( + blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null, + useProto); + Token token = sm.generateToken(block1, + EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class)); + final byte[] tokenBytes = token.getIdentifier(); + BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); + BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); + BlockTokenIdentifier readToken = new BlockTokenIdentifier(); + + DataInputBuffer dib = new DataInputBuffer(); + + /* We receive NegativeArraySizeException because we didn't call + * readFields and instead try to parse this directly as a legacy + * BlockTokenIdentifier. + * + * Note: because the parsing depends on the expiryDate which is based on + * `Time.now()` it can sometimes fail with IOException and sometimes with + * NegativeArraySizeException. + */ + boolean invalidLegacyMessage = false; + try { + dib.reset(tokenBytes, tokenBytes.length); + legacyToken.readFieldsLegacy(dib); + } catch (IOException | NegativeArraySizeException e) { + invalidLegacyMessage = true; + } + assertTrue(invalidLegacyMessage); + + dib.reset(tokenBytes, tokenBytes.length); + protobufToken.readFieldsProtobuf(dib); + + dib.reset(tokenBytes, tokenBytes.length); + readToken.readFields(dib); + + // Using protobuf, the token parses as a protobuf and not a legacy block + // token + assertNotEquals(legacyToken, readToken); + assertEquals(protobufToken, readToken); + } + + public void testCraftedProtobufBlockTokenIdentifier( + BlockTokenIdentifier identifier, boolean expectIOE, + boolean expectRTE) throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(4096); + DataInputBuffer dib = new DataInputBuffer(); + + identifier.writeProtobuf(dob); + byte[] identBytes = Arrays.copyOf(dob.getData(), dob.getLength()); + + BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); + BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); + BlockTokenIdentifier readToken = new BlockTokenIdentifier(); + + boolean invalidLegacyMessage = false; + try { + dib.reset(identBytes, identBytes.length); + legacyToken.readFieldsLegacy(dib); + } catch (IOException e) { + if (!expectIOE) { + fail("Received IOException but it was not expected."); + } + invalidLegacyMessage = true; + } catch (RuntimeException e) { + if (!expectRTE) { + fail("Received RuntimeException but it was not expected."); + } + invalidLegacyMessage = true; + } + + assertTrue(invalidLegacyMessage); + + dib.reset(identBytes, identBytes.length); + protobufToken.readFieldsProtobuf(dib); + + dib.reset(identBytes, identBytes.length); + readToken.readFieldsProtobuf(dib); + assertEquals(protobufToken, readToken); + } + + @Test + public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws + IOException { + // Empty BlockTokenIdentifiers throw IOException + BlockTokenIdentifier identifier = new BlockTokenIdentifier(); + testCraftedProtobufBlockTokenIdentifier(identifier, true, false); + + /* Parsing BlockTokenIdentifier with expiryDate + * 2017-02-09 00:12:35,072+0100 will throw IOException. + * However, expiryDate of + * 2017-02-09 00:12:35,071+0100 will throw NegativeArraySizeException. + */ + Calendar cal = new GregorianCalendar(); + cal.set(2017, 1, 9, 0, 12, 35); + long datetime = cal.getTimeInMillis(); + datetime = ((datetime / 1000) * 1000); // strip milliseconds. + datetime = datetime + 71; // 2017-02-09 00:12:35,071+0100 + identifier.setExpiryDate(datetime); + testCraftedProtobufBlockTokenIdentifier(identifier, false, true); + datetime += 1; // 2017-02-09 00:12:35,072+0100 + identifier.setExpiryDate(datetime); + testCraftedProtobufBlockTokenIdentifier(identifier, true, false); + } }