HDFS-12741. Ozone: KSM: ADD support for KSM --createObjectStore command. Contributed by Shashikant Banerjee.

This commit is contained in:
Nanda kumar 2017-12-20 01:25:10 +05:30 committed by Owen O'Malley
parent caeeb78ca4
commit 4dae68eebf
13 changed files with 341 additions and 29 deletions

View File

@ -110,7 +110,7 @@ public long getCreationTime() {
public void setClusterId(String clusterId) throws IOException {
if (state == StorageState.INITIALIZED) {
throw new IOException(
"Storage directory " + storageDir + "already initialized.");
"Storage directory " + storageDir + " already initialized.");
} else {
storageInfo.setClusterId(clusterId);
}

View File

@ -76,7 +76,7 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
private final long openKeyExpireThresholdMS;
public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
File metaDir = OzoneUtils.getScmMetadirPath(conf);
File metaDir = OzoneUtils.getOzoneMetaDirPath(conf);
final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType;
import org.apache.hadoop.ozone.scm.SCMStorage;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
/**
* KSMStorage is responsible for management of the StorageDirectories used by
* the KSM.
*/
public class KSMStorage extends Storage {
public static final String STORAGE_DIR = "ksm";
public static final String KSM_ID = "ksmUuid";
/**
* Construct KSMStorage.
* @throws IOException if any directories are inaccessible.
*/
public KSMStorage(OzoneConfiguration conf) throws IOException {
super(NodeType.KSM, OzoneUtils.getOzoneMetaDirPath(conf), STORAGE_DIR);
}
public void setScmId(String scmId) throws IOException {
if (getState() == StorageState.INITIALIZED) {
throw new IOException("KSM is already initialized.");
} else {
getStorageInfo().setProperty(SCMStorage.SCM_ID, scmId);
}
}
public void setKsmId(String ksmId) throws IOException {
if (getState() == StorageState.INITIALIZED) {
throw new IOException("KSM is already initialized.");
} else {
getStorageInfo().setProperty(KSM_ID, ksmId);
}
}
/**
* Retrieves the SCM ID from the version file.
* @return SCM_ID
*/
public String getScmId() {
return getStorageInfo().getProperty(SCMStorage.SCM_ID);
}
/**
* Retrieves the KSM ID from the version file.
* @return KSM_ID
*/
public String getKsmId() {
return getStorageInfo().getProperty(KSM_ID);
}
@Override
protected Properties getNodeProperties() {
String ksmId = getKsmId();
if (ksmId == null) {
ksmId = UUID.randomUUID().toString();
}
Properties ksmProperties = new Properties();
ksmProperties.setProperty(KSM_ID, ksmId);
return ksmProperties;
}
}

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.jmx.ServiceRuntimeInfoImpl;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
@ -34,6 +36,7 @@
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
@ -55,6 +58,7 @@
import javax.management.ObjectName;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
@ -81,6 +85,37 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
private static final Logger LOG =
LoggerFactory.getLogger(KeySpaceManager.class);
private static final String USAGE =
"Usage: \n hdfs ksm [genericOptions] " + "[ "
+ StartupOption.CREATEOBJECTSTORE.getName() + " ]\n " + "hdfs ksm [ "
+ StartupOption.HELP.getName() + " ]\n";
/** Startup options. */
public enum StartupOption {
CREATEOBJECTSTORE("-createObjectStore"),
HELP("-help"),
REGULAR("-regular");
private final String name;
StartupOption(String arg) {
this.name = arg;
}
public String getName() {
return name;
}
public static StartupOption parse(String value) {
for (StartupOption option : StartupOption.values()) {
if (option.name.equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
}
private final RPC.Server ksmRpcServer;
private final InetSocketAddress ksmRpcAddress;
private final KSMMetadataManager metadataManager;
@ -89,10 +124,24 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
private final KeyManager keyManager;
private final KSMMetrics metrics;
private final KeySpaceManagerHttpServer httpServer;
private final KSMStorage ksmStorage;
private ObjectName ksmInfoBeanName;
private static final String USAGE = "hdfs ksm [genericOptions]";
public KeySpaceManager(OzoneConfiguration conf) throws IOException {
private KeySpaceManager(OzoneConfiguration conf) throws IOException {
ksmStorage = new KSMStorage(conf);
ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
if (ksmStorage.getState() != StorageState.INITIALIZED) {
throw new KSMException("KSM not initialized.",
ResultCodes.KSM_NOT_INITIALIZED);
}
// verifies that the SCM info in the KSM Version file is correct.
ScmInfo scmInfo = scmBlockClient.getScmInfo();
if (!(scmInfo.getClusterId().equals(ksmStorage.getClusterID()) && scmInfo
.getScmId().equals(ksmStorage.getScmId()))) {
throw new KSMException("SCM version info mismatch.",
ResultCodes.SCM_VERSION_MISMATCH_ERROR);
}
final int handlerCount = conf.getInt(OZONE_KSM_HANDLER_COUNT_KEY,
OZONE_KSM_HANDLER_COUNT_DEFAULT);
@ -124,8 +173,8 @@ public KeySpaceManager(OzoneConfiguration conf) throws IOException {
* @return
* @throws IOException
*/
private ScmBlockLocationProtocol getScmBlockClient(OzoneConfiguration conf)
throws IOException {
private static ScmBlockLocationProtocol getScmBlockClient(
OzoneConfiguration conf) throws IOException {
RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
ProtobufRpcEngine.class);
long scmVersion =
@ -145,6 +194,11 @@ private ScmBlockLocationProtocol getScmBlockClient(OzoneConfiguration conf)
public ScmInfo getScmInfo(OzoneConfiguration conf) throws IOException {
return getScmBlockClient(conf).getScmInfo();
}
@VisibleForTesting
public KSMStorage getKsmStorage() {
return ksmStorage;
}
/**
* Starts an RPC server, if configured.
*
@ -193,34 +247,124 @@ public KSMMetrics getMetrics() {
* @throws IOException if startup fails due to I/O error
*/
public static void main(String[] argv) throws IOException {
if (DFSUtil.parseHelpArgument(argv, USAGE,
System.out, true)) {
if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
System.exit(0);
}
try {
OzoneConfiguration conf = new OzoneConfiguration();
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
if (!hParser.isParseSuccessful()
|| hParser.getRemainingArgs().length > 0) {
if (!hParser.isParseSuccessful()) {
System.err.println("USAGE: " + USAGE + " \n");
hParser.printGenericCommandUsage(System.err);
System.exit(1);
}
if (!DFSUtil.isOzoneEnabled(conf)) {
System.out.println("KSM cannot be started in secure mode or when " +
OZONE_ENABLED + " is set to false");
System.exit(1);
}
StringUtils.startupShutdownMessage(KeySpaceManager.class, argv, LOG);
KeySpaceManager ksm = new KeySpaceManager(conf);
ksm.start();
ksm.join();
KeySpaceManager ksm = createKSM(hParser.getRemainingArgs(), conf);
if (ksm != null) {
ksm.start();
ksm.join();
}
} catch (Throwable t) {
LOG.error("Failed to start the KeyspaceManager.", t);
terminate(1, t);
}
}
private static void printUsage(PrintStream out) {
out.println(USAGE + "\n");
}
/**
* Constructs KSM instance based on command line arguments.
* @param argv Command line arguments
* @param conf OzoneConfiguration
* @return KSM instance
* @throws IOException in case KSM instance creation fails.
*/
public static KeySpaceManager createKSM(String[] argv,
OzoneConfiguration conf) throws IOException {
if (!DFSUtil.isOzoneEnabled(conf)) {
System.err.println("KSM cannot be started in secure mode or when " +
OZONE_ENABLED + " is set to false");
System.exit(1);
}
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
terminate(1);
return null;
}
switch (startOpt) {
case CREATEOBJECTSTORE:
terminate(ksmInit(conf) ? 0 : 1);
return null;
case HELP:
printUsage(System.err);
terminate(0);
return null;
default:
return new KeySpaceManager(conf);
}
}
/**
* Initializes the KSM instance.
* @param conf OzoneConfiguration
* @return true if KSM initialization succeeds , false otherwise
* @throws IOException in case ozone metadata directory path is not accessible
*/
private static boolean ksmInit(OzoneConfiguration conf) throws IOException {
KSMStorage ksmStorage = new KSMStorage(conf);
StorageState state = ksmStorage.getState();
if (state != StorageState.INITIALIZED) {
try {
ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
ScmInfo scmInfo = scmBlockClient.getScmInfo();
String clusterId = scmInfo.getClusterId();
String scmId = scmInfo.getScmId();
if (clusterId == null || clusterId.isEmpty()) {
throw new IOException("Invalid Cluster ID");
}
if (scmId == null || scmId.isEmpty()) {
throw new IOException("Invalid SCM ID");
}
ksmStorage.setClusterId(clusterId);
ksmStorage.setScmId(scmId);
ksmStorage.initialize();
System.out.println(
"KSM initialization succeeded.Current cluster id for sd="
+ ksmStorage.getStorageDir() + ";cid=" + ksmStorage
.getClusterID());
return true;
} catch (IOException ioe) {
LOG.error("Could not initialize KSM version file", ioe);
return false;
}
} else {
System.out.println(
"KSM already initialized.Reusing existing cluster id for sd="
+ ksmStorage.getStorageDir() + ";cid=" + ksmStorage
.getClusterID());
return true;
}
}
/**
* Parses the command line options for KSM initialization.
* @param args command line arguments
* @return StartupOption if options are valid, null otherwise
*/
private static StartupOption parseArguments(String[] args) {
if (args == null || args.length == 0) {
return StartupOption.REGULAR;
} else if (args.length == 1) {
return StartupOption.parse(args[0]);
}
return null;
}
/**
* Builds a message for logging startup information about an RPC server.
*

View File

@ -109,6 +109,8 @@ public enum ResultCodes {
FAILED_KEY_ALLOCATION,
FAILED_KEY_DELETION,
FAILED_METADATA_ERROR,
FAILED_INTERNAL_ERROR
FAILED_INTERNAL_ERROR,
KSM_NOT_INITIALIZED,
SCM_VERSION_MISMATCH_ERROR
}
}

View File

@ -41,7 +41,7 @@ public class SCMStorage extends Storage {
* @throws IOException if any directories are inaccessible.
*/
public SCMStorage(OzoneConfiguration conf) throws IOException {
super(NodeType.SCM, OzoneUtils.getScmMetadirPath(conf), STORAGE_DIR);
super(NodeType.SCM, OzoneUtils.getOzoneMetaDirPath(conf), STORAGE_DIR);
}
public void setScmId(String scmId) throws IOException {

View File

@ -112,7 +112,7 @@ public BlockManagerImpl(final Configuration conf,
this.containerSize = OzoneConsts.GB * conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
File metaDir = OzoneUtils.getScmMetadirPath(conf);
File metaDir = OzoneUtils.getOzoneMetaDirPath(conf);
String scmMetaDataDir = metaDir.getPath();
// Write the block key to container name mapping.

View File

@ -77,7 +77,7 @@ public DeletedBlockLogImpl(Configuration conf) throws IOException {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
File metaDir = OzoneUtils.getScmMetadirPath(conf);
File metaDir = OzoneUtils.getOzoneMetaDirPath(conf);
String scmMetaDataDir = metaDir.getPath();
File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,

View File

@ -99,7 +99,7 @@ public ContainerMapping(
this.nodeManager = nodeManager;
this.cacheSize = cacheSizeMB;
File metaDir = OzoneUtils.getScmMetadirPath(conf);
File metaDir = OzoneUtils.getOzoneMetaDirPath(conf);
// Write the container name to pipeline mapping.
File containerDBPath = new File(metaDir, CONTAINER_DB);

View File

@ -81,7 +81,7 @@ public SCMNodePoolManager(final OzoneConfiguration conf)
throws IOException {
final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
File metaDir = OzoneUtils.getScmMetadirPath(conf);
File metaDir = OzoneUtils.getOzoneMetaDirPath(conf);
String scmMetaDataDir = metaDir.getPath();
File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB);
nodePoolStore = MetadataStoreBuilder.newBuilder()

View File

@ -250,7 +250,7 @@ public static Response getResponse(UserArgs args, int statusCode,
*
* @return File MetaDir
*/
public static File getScmMetadirPath(Configuration conf) {
public static File getOzoneMetaDirPath(Configuration conf) {
String metaDirPath = conf.getTrimmed(OzoneConfigKeys
.OZONE_METADATA_DIRS);
Preconditions.checkNotNull(metaDirPath);

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.ozone.scm.SCMStorage;
import org.apache.hadoop.ozone.ksm.KSMStorage;
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.protocolPB
@ -86,7 +87,7 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
private final OzoneConfiguration conf;
private final StorageContainerManager scm;
private final KeySpaceManager ksm;
private KeySpaceManager ksm;
private final Path tempPath;
/**
@ -338,6 +339,7 @@ public static class Builder
private Optional<String> scmMetadataDir = Optional.empty();
private Optional<String> clusterId = Optional.empty();
private Optional<String> scmId = Optional.empty();
private Optional<String> ksmId = Optional.empty();
private Boolean ozoneEnabled = true;
private Boolean waitForChillModeFinish = true;
private Boolean randomContainerPort = true;
@ -436,6 +438,11 @@ public Builder setScmId(String sId) {
return this;
}
public Builder setKsmId(String kId) {
ksmId = Optional.of(kId);
return this;
}
public String getPath() {
return path;
}
@ -453,6 +460,7 @@ public MiniOzoneClassicCluster build() throws IOException {
configureSCMheartbeat();
configScmMetadata();
initializeScm();
initializeKSM();
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
@ -474,7 +482,7 @@ public MiniOzoneClassicCluster build() throws IOException {
null, conf);
scm.start();
KeySpaceManager ksm = new KeySpaceManager(conf);
KeySpaceManager ksm = KeySpaceManager.createKSM(null, conf);
ksm.start();
String addressString = scm.getDatanodeRpcAddress().getHostString() +
@ -527,11 +535,25 @@ private void configScmMetadata() throws IOException {
private void initializeScm() throws IOException {
SCMStorage scmStore = new SCMStorage(conf);
scmStore.setClusterId(clusterId.orElse(runID.toString()));
scmStore.setScmId(scmId.orElse(UUID.randomUUID().toString()));
if (!clusterId.isPresent()) {
clusterId = Optional.of(runID.toString());
}
scmStore.setClusterId(clusterId.get());
if (!scmId.isPresent()) {
scmId = Optional.of(UUID.randomUUID().toString());
}
scmStore.setScmId(scmId.get());
scmStore.initialize();
}
private void initializeKSM() throws IOException {
KSMStorage ksmStore = new KSMStorage(conf);
ksmStore.setClusterId(clusterId.get());
ksmStore.setScmId(scmId.get());
ksmStore.setKsmId(ksmId.orElse(UUID.randomUUID().toString()));
ksmStore.initialize();
}
private void configureHandler() {
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, this.ozoneEnabled);
if (!ozoneHandlerType.isPresent()) {

View File

@ -28,6 +28,8 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.scm.SCMStorage;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
@ -39,6 +41,7 @@
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.ScmInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.protocol.proto
@ -61,6 +64,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.ParseException;
import java.util.HashSet;
import java.util.LinkedList;
@ -86,6 +91,7 @@ public class TestKeySpaceManager {
private static OzoneConfiguration conf;
private static String clusterId;
private static String scmId;
private static String ksmId;
@Rule
public ExpectedException exception = ExpectedException.none();
@ -103,6 +109,7 @@ public static void init() throws Exception {
conf = new OzoneConfiguration();
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
ksmId = UUID.randomUUID().toString();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
conf.setInt(OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS, 2);
@ -111,6 +118,7 @@ public static void init() throws Exception {
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.setClusterId(clusterId)
.setScmId(scmId)
.setKsmId(ksmId)
.build();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
@ -1130,4 +1138,51 @@ public void testExpiredOpenKey() throws Exception {
String keyName = subs[subs.length - 1];
Assert.assertEquals("testKey5", keyName);
}
/**
* Tests the KSM Initialization.
* @throws IOException
*/
@Test
public void testKSMInitialization() throws IOException {
// Read the version file info from KSM version file
KSMStorage ksmStorage = cluster.getKeySpaceManager().getKsmStorage();
SCMStorage scmStorage = new SCMStorage(conf);
// asserts whether cluster Id and SCM ID are properly set in SCM Version
// file.
Assert.assertEquals(clusterId, scmStorage.getClusterID());
Assert.assertEquals(scmId, scmStorage.getScmId());
// asserts whether KSM Id is properly set in KSM Version file.
Assert.assertEquals(ksmId, ksmStorage.getKsmId());
// asserts whether the SCM info is correct in KSM Version file.
Assert.assertEquals(clusterId, ksmStorage.getClusterID());
Assert.assertEquals(scmId, ksmStorage.getScmId());
}
/**
* Tests the KSM Initialization Failure.
* @throws IOException
*/
@Test
public void testKSMInitializationFailure() throws Exception {
OzoneConfiguration config = new OzoneConfiguration();
final String path =
GenericTestUtils.getTempPath(UUID.randomUUID().toString());
Path metaDirPath = Paths.get(path, "ksm-meta");
config.set(OzoneConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
config.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
config.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
conf.get(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY));
exception.expect(KSMException.class);
exception.expectMessage("KSM not initialized.");
KeySpaceManager.createKSM(null, config);
KSMStorage ksmStore = new KSMStorage(config);
ksmStore.setClusterId("testClusterId");
ksmStore.setScmId("testScmId");
// writes the version file properties
ksmStore.initialize();
exception.expect(KSMException.class);
exception.expectMessage("SCM version info mismatch.");
KeySpaceManager.createKSM(null, conf);
}
}