HDFS-11154. Block Storage: store server state to persistent storage. Contributed by Chen Liang

This commit is contained in:
Anu Engineer 2016-11-22 17:41:13 -08:00 committed by Owen O'Malley
parent 9997d36eb6
commit a7f82cba71
16 changed files with 415 additions and 18 deletions

View File

@ -57,6 +57,12 @@ public final class CBlockConfigKeys {
"dfs.storage.service.handler.count";
public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY =
"dfs.cblock.service.leveldb.path";
//TODO : find a better place
public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT =
"/tmp/cblock_levelDB.dat";
private CBlockConfigKeys() {
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.cblock;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
import org.apache.hadoop.cblock.meta.VolumeInfo;
@ -31,16 +32,22 @@
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.cblock.storage.IStorageClient;
import org.apache.hadoop.cblock.storage.StorageManager;
import org.apache.hadoop.cblock.util.KeyUtil;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
@ -50,6 +57,8 @@
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
/**
* The main entry point of CBlock operations, ALL the CBlock operations
@ -74,10 +83,21 @@ public class CBlockManager implements CBlockServiceProtocol,
private final StorageManager storageManager;
private final LevelDBStore levelDBStore;
private final String dbPath;
private Charset encoding = Charset.forName("UTF-8");
public CBlockManager(CBlockConfiguration conf, IStorageClient storageClient
) throws IOException {
storageManager = new StorageManager(storageClient);
dbPath = conf.getTrimmed(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY,
DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT);
levelDBStore = new LevelDBStore(new File(dbPath), true);
LOG.info("Try to load exising volume information");
readFromPersistentStore();
RPC.setProtocolEngine(conf, CBlockServiceProtocolPB.class,
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class,
@ -182,13 +202,13 @@ private static RPC.Server startRpcServer(CBlockConfiguration conf,
}
@Override
public MountVolumeResponse mountVolume(
public synchronized MountVolumeResponse mountVolume(
String userName, String volumeName) throws IOException {
return storageManager.isVolumeValid(userName, volumeName);
}
@Override
public void createVolume(String userName, String volumeName,
public synchronized void createVolume(String userName, String volumeName,
long volumeSize, int blockSize) throws IOException {
LOG.info("Create volume received: userName: {} volumeName: {} " +
"volumeSize: {} blockSize: {}", userName, volumeName,
@ -205,25 +225,82 @@ public void createVolume(String userName, String volumeName,
if (volume == null) {
throw new IOException("Volume creation failed!");
}
String volumeKey = KeyUtil.getVolumeKey(userName, volumeName);
writeToPersistentStore(volumeKey.getBytes(encoding),
volume.toProtobuf().toByteArray());
}
@Override
public void deleteVolume(String userName,
public synchronized void deleteVolume(String userName,
String volumeName, boolean force) throws IOException {
LOG.info("Delete volume received: volume:" + volumeName
+ " force?:" + force);
LOG.info("Delete volume received: volume: {} {} ", volumeName, force);
storageManager.deleteVolume(userName, volumeName, force);
// being here means volume is successfully deleted now
String volumeKey = KeyUtil.getVolumeKey(userName, volumeName);
removeFromPersistentStore(volumeKey.getBytes(encoding));
}
// No need to synchronize on the following three methods, since write and
// remove's caller are synchronized. read's caller is the constructor and
// no other method call can happen at that time.
@VisibleForTesting
public void writeToPersistentStore(byte[] key, byte[] value) {
levelDBStore.put(key, value);
}
@VisibleForTesting
public void removeFromPersistentStore(byte[] key) {
levelDBStore.delete(key);
}
public void readFromPersistentStore() {
DBIterator iter = levelDBStore.getIterator();
iter.seekToFirst();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
String volumeKey = new String(entry.getKey(), encoding);
try {
VolumeDescriptor volumeDescriptor =
VolumeDescriptor.fromProtobuf(entry.getValue());
storageManager.addVolume(volumeDescriptor);
} catch (IOException e) {
LOG.error("Loading volume " + volumeKey + " error " + e);
}
}
}
@Override
public VolumeInfo infoVolume(String userName, String volumeName
public synchronized VolumeInfo infoVolume(String userName, String volumeName
) throws IOException {
LOG.info("Info volume received: volume: {}", volumeName);
return storageManager.infoVolume(userName, volumeName);
}
@VisibleForTesting
public synchronized List<VolumeDescriptor> getAllVolumes() {
return storageManager.getAllVolume(null);
}
public synchronized void close() {
try {
levelDBStore.close();
} catch (IOException e) {
LOG.error("Error when closing levelDB " + e);
}
}
public synchronized void clean() {
try {
levelDBStore.close();
levelDBStore.destroy();
} catch (IOException e) {
LOG.error("Error when deleting levelDB " + e);
}
}
@Override
public List<VolumeInfo> listVolume(String userName) throws IOException {
public synchronized List<VolumeInfo> listVolume(String userName)
throws IOException {
ArrayList<VolumeInfo> response = new ArrayList<>();
List<VolumeDescriptor> allVolumes =
storageManager.getAllVolume(userName);

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.cblock.meta;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
/**
*
* The internal representation of a container maintained by CBlock server.
@ -29,7 +32,7 @@
public class ContainerDescriptor {
private final String containerID;
// the index of this container with in a volume
// on creation, there is no way to know the index of the container
// on creation, there may be no way to know the index of the container
// as it is a volume specific information
private int containerIndex;
@ -37,6 +40,11 @@ public ContainerDescriptor(String containerID) {
this.containerID = containerID;
}
public ContainerDescriptor(String containerID, int containerIndex) {
this.containerID = containerID;
this.containerIndex = containerIndex;
}
public void setContainerIndex(int idx) {
this.containerIndex = idx;
}
@ -52,4 +60,35 @@ public int getContainerIndex() {
public long getUtilization() {
return 0;
}
public CBlockClientServerProtocolProtos.ContainerIDProto toProtobuf() {
CBlockClientServerProtocolProtos.ContainerIDProto.Builder builder =
CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder();
builder.setContainerID(containerID);
builder.setIndex(containerIndex);
return builder.build();
}
public static ContainerDescriptor fromProtobuf(byte[] data)
throws InvalidProtocolBufferException {
CBlockClientServerProtocolProtos.ContainerIDProto id =
CBlockClientServerProtocolProtos.ContainerIDProto.parseFrom(data);
return new ContainerDescriptor(id.getContainerID(),
(int)id.getIndex());
}
@Override
public int hashCode() {
return containerID.hashCode()*37 + containerIndex;
}
@Override
public boolean equals(Object o) {
if (o != null && o instanceof ContainerDescriptor) {
ContainerDescriptor other = (ContainerDescriptor)o;
return containerID.equals(other.containerID) &&
containerIndex == other.containerIndex;
}
return false;
}
}

View File

@ -17,10 +17,13 @@
*/
package org.apache.hadoop.cblock.meta;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -162,4 +165,90 @@ public String toString() {
string += "containerIdsWithObject:" + containerMap.keySet();
return string;
}
public CBlockClientServerProtocolProtos.MountVolumeResponseProto
toProtobuf() {
CBlockClientServerProtocolProtos.MountVolumeResponseProto.Builder volume =
CBlockClientServerProtocolProtos.MountVolumeResponseProto.newBuilder();
volume.setIsValid(true);
volume.setVolumeName(volumeName);
volume.setUserName(userName);
volume.setVolumeSize(volumeSize);
volume.setBlockSize(blockSize);
for (String containerIDString : containerIdOrdered) {
ContainerDescriptor containerDescriptor = containerMap.get(
containerIDString);
volume.addAllContainerIDs(containerDescriptor.toProtobuf());
}
return volume.build();
}
public static VolumeDescriptor fromProtobuf(byte[] data)
throws InvalidProtocolBufferException {
CBlockClientServerProtocolProtos.MountVolumeResponseProto volume =
CBlockClientServerProtocolProtos.MountVolumeResponseProto
.parseFrom(data);
String userName = volume.getUserName();
String volumeName = volume.getVolumeName();
long volumeSize = volume.getVolumeSize();
int blockSize = volume.getBlockSize();
VolumeDescriptor volumeDescriptor = new VolumeDescriptor(userName,
volumeName, volumeSize, blockSize);
List<CBlockClientServerProtocolProtos.ContainerIDProto> containers
= volume.getAllContainerIDsList();
String[] containerOrdering = new String[containers.size()];
for (CBlockClientServerProtocolProtos.ContainerIDProto containerProto :
containers) {
ContainerDescriptor containerDescriptor = new ContainerDescriptor(
containerProto.getContainerID(),
(int)containerProto.getIndex());
volumeDescriptor.addContainer(containerDescriptor);
containerOrdering[containerDescriptor.getContainerIndex()] =
containerDescriptor.getContainerID();
}
volumeDescriptor.setContainerIDs(
new ArrayList<>(Arrays.asList(containerOrdering)));
return volumeDescriptor;
}
@Override
public int hashCode() {
return userName.hashCode()*37 + volumeName.hashCode();
}
@Override
public boolean equals(Object o) {
if (o != null && o instanceof VolumeDescriptor) {
VolumeDescriptor other = (VolumeDescriptor)o;
if (!userName.equals(other.getUserName()) ||
!volumeName.equals(other.getVolumeName()) ||
volumeSize != other.getVolumeSize() ||
blockSize != other.getBlockSize()) {
return false;
}
if (containerIdOrdered.size() != other.containerIdOrdered.size() ||
containerMap.size() != other.containerMap.size()) {
return false;
}
for (int i = 0; i<containerIdOrdered.size(); i++) {
if (!containerIdOrdered.get(i).equals(
other.containerIdOrdered.get(i))) {
return false;
}
}
for (String containerKey : containerMap.keySet()) {
if (!other.containerMap.containsKey(containerKey)) {
return false;
}
if (!containerMap.get(containerKey).equals(
other.containerMap.get(containerKey))) {
return false;
}
}
return true;
}
return false;
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.util;
/**
* A simply class that generates key mappings. (e.g. from (userName, volumeName)
* pair to a single string volumeKey.
*/
public final class KeyUtil {
public static String getVolumeKey(String userName, String volumeName) {
return userName + ":" + volumeName;
}
private KeyUtil() {
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.util;

View File

@ -24,7 +24,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
import org.apache.hadoop.utils.LevelDBStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
import org.apache.hadoop.utils.LevelDBStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
import org.apache.hadoop.utils.LevelDBStore;
import java.io.IOException;
import java.util.List;

View File

@ -22,6 +22,7 @@
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.utils.LevelDBStore;
import java.io.IOException;
import java.util.concurrent.locks.Lock;

View File

@ -22,9 +22,9 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.LevelDBStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.utils;
package org.apache.hadoop.utils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
@ -33,6 +33,7 @@
public class LevelDBStore {
private DB db;
private final File dbFile;
private final Options dbOptions;
/**
* Opens a DB file.
@ -43,9 +44,9 @@ public class LevelDBStore {
*/
public LevelDBStore(File dbPath, boolean createIfMissing) throws
IOException {
Options options = new Options();
options.createIfMissing(createIfMissing);
db = JniDBFactory.factory.open(dbPath, options);
dbOptions = new Options();
dbOptions.createIfMissing(createIfMissing);
db = JniDBFactory.factory.open(dbPath, dbOptions);
if (db == null) {
throw new IOException("Db is null");
}
@ -60,6 +61,7 @@ public LevelDBStore(File dbPath, boolean createIfMissing) throws
*/
public LevelDBStore(File dbPath, Options options)
throws IOException {
dbOptions = options;
db = JniDBFactory.factory.open(dbPath, options);
if (db == null) {
throw new IOException("Db is null");
@ -140,4 +142,16 @@ public DB getDB() {
return db;
}
/**
* Returns an iterator on all the key-value pairs in the DB.
* @return an iterator on DB entries.
*/
public DBIterator getIterator() {
return db.iterator();
}
public void destroy() throws IOException {
JniDBFactory.factory.destroy(dbFile, dbOptions);
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.utils;

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
import org.apache.hadoop.cblock.storage.IStorageClient;
import org.apache.hadoop.cblock.util.MockStorageClient;
import org.junit.Test;
import java.util.List;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
import static org.junit.Assert.assertEquals;
/**
* Test the CBlock server state is maintained in persistent storage and can be
* recovered on CBlock server restart.
*/
public class TestCBlockServerPersistence {
/**
* Test when cblock server fails with volume meta data, the meta data can be
* restored correctly.
* @throws Exception
*/
@Test
public void testWriteToPersistentStore() throws Exception {
String userName = "testWriteToPersistentStore";
String volumeName1 = "testVolume1";
String volumeName2 = "testVolume2";
long volumeSize1 = 30L*1024*1024*1024;
long volumeSize2 = 15L*1024*1024*1024;
int blockSize = 4096;
CBlockManager cBlockManager = null;
CBlockManager cBlockManager1 = null;
String dbPath = "/tmp/testCblockPersistence.dat";
try {
IStorageClient storageClient = new MockStorageClient();
CBlockConfiguration conf = new CBlockConfiguration();
conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, dbPath);
cBlockManager = new CBlockManager(conf, storageClient);
cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize);
cBlockManager.createVolume(userName, volumeName2, volumeSize2, blockSize);
List<VolumeDescriptor> allVolumes = cBlockManager.getAllVolumes();
// close the cblock server. Since meta data is written to disk on volume
// creation, closing server here is the same as a cblock server crash.
cBlockManager.close();
cBlockManager.stop();
cBlockManager.join();
cBlockManager = null;
assertEquals(2, allVolumes.size());
VolumeDescriptor volumeDescriptor1 = allVolumes.get(0);
VolumeDescriptor volumeDescriptor2 = allVolumes.get(1);
// create a new cblock server instance. This is just the
// same as restarting cblock server.
IStorageClient storageClient1 = new MockStorageClient();
CBlockConfiguration conf1 = new CBlockConfiguration();
conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, dbPath);
cBlockManager1 = new CBlockManager(conf1, storageClient1);
List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes();
assertEquals(2, allVolumes1.size());
VolumeDescriptor newvolumeDescriptor1 = allVolumes1.get(0);
VolumeDescriptor newvolumeDescriptor2 = allVolumes1.get(1);
// It seems levelDB iterator gets keys in the same order as keys
// are inserted, in which case the else clause should never happen.
// But still kept the second clause if it is possible to get different
// key ordering from leveldb. And we do not rely on the ordering of keys
// here.
if (volumeDescriptor1.getVolumeName().equals(
newvolumeDescriptor1.getVolumeName())) {
assertEquals(volumeDescriptor1, newvolumeDescriptor1);
assertEquals(volumeDescriptor2, newvolumeDescriptor2);
} else {
assertEquals(volumeDescriptor1, newvolumeDescriptor2);
assertEquals(volumeDescriptor2, newvolumeDescriptor1);
}
} finally {
if (cBlockManager != null) {
cBlockManager.clean();
}
if (cBlockManager1 != null) {
cBlockManager1.clean();
}
}
}
}

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.After;