HDFS-10251. Ozone: Shutdown cleanly. Contributed by Anu Engineer
This commit is contained in:
parent
e11e824c9b
commit
7bb2ab7d7c
@ -19,8 +19,11 @@
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
|
||||
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@ -71,6 +74,27 @@ public static LevelDBStore getDB(ContainerData container,
|
||||
return db;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown all DB Handles.
|
||||
*
|
||||
* @param cache - Cache for DB Handles.
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void shutdownCache(ContainerCache cache) {
|
||||
Logger log = LoggerFactory.getLogger(KeyManagerImpl.class);
|
||||
LevelDBStore[] handles = new LevelDBStore[cache.values().size()];
|
||||
cache.values().toArray(handles);
|
||||
Preconditions.checkState(handles.length == cache.values().size());
|
||||
for (LevelDBStore db : handles) {
|
||||
try {
|
||||
db.close();
|
||||
} catch (IOException ex) {
|
||||
log.error("error closing db. error {}", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns successful keyResponse.
|
||||
* @param msg - Request.
|
||||
|
@ -146,4 +146,15 @@ public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info)
|
||||
containerManager.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the chunkManager.
|
||||
*
|
||||
* In the chunkManager we haven't acquired any resources, so nothing to do
|
||||
* here. This call is made with containerManager Writelock held.
|
||||
*/
|
||||
@Override
|
||||
public void shutdown() {
|
||||
Preconditions.checkState(this.containerManager.hasWriteLock());
|
||||
}
|
||||
}
|
||||
|
@ -385,8 +385,9 @@ public ContainerData readContainer(String containerName) throws IOException {
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
|
||||
public void shutdown() {
|
||||
Preconditions.checkState(this.hasWriteLock());
|
||||
this.containerMap.clear();
|
||||
}
|
||||
|
||||
|
||||
|
@ -142,4 +142,13 @@ public List<KeyData> listKey(Pipeline pipeline, String prefix, String
|
||||
// TODO :
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown keyManager.
|
||||
*/
|
||||
@Override
|
||||
public void shutdown() {
|
||||
Preconditions.checkState(this.containerManager.hasWriteLock());
|
||||
KeyUtils.shutdownCache(containerCache);
|
||||
}
|
||||
}
|
||||
|
@ -65,4 +65,9 @@ void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
|
||||
|
||||
// TODO : Support list operations.
|
||||
|
||||
/**
|
||||
* Shutdown the chunkManager.
|
||||
*/
|
||||
void shutdown();
|
||||
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ void listContainer(String prevKey, long count, List<ContainerData> data)
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
void shutdown() throws IOException;
|
||||
void shutdown();
|
||||
|
||||
/**
|
||||
* Sets the Chunk Manager.
|
||||
|
@ -29,13 +29,15 @@
|
||||
public interface KeyManager {
|
||||
/**
|
||||
* Puts or overwrites a key.
|
||||
*
|
||||
* @param pipeline - Pipeline.
|
||||
* @param data - Key Data.
|
||||
* @param data - Key Data.
|
||||
*/
|
||||
void putKey(Pipeline pipeline, KeyData data) throws IOException;
|
||||
|
||||
/**
|
||||
* Gets an existing key.
|
||||
*
|
||||
* @param data - Key Data.
|
||||
* @return Key Data.
|
||||
*/
|
||||
@ -43,21 +45,26 @@ public interface KeyManager {
|
||||
|
||||
/**
|
||||
* Deletes an existing Key.
|
||||
* @param pipeline - Pipeline.
|
||||
* @param keyName Key Data.
|
||||
*
|
||||
* @param pipeline - Pipeline.
|
||||
* @param keyName Key Data.
|
||||
*/
|
||||
void deleteKey(Pipeline pipeline, String keyName) throws IOException;
|
||||
|
||||
/**
|
||||
* List keys in a container.
|
||||
*
|
||||
* @param pipeline - pipeline.
|
||||
* @param prefix - Prefix in needed.
|
||||
* @param prevKey - Key to Start from, EMPTY_STRING to begin.
|
||||
* @param count - Number of keys to return.
|
||||
* @param prefix - Prefix in needed.
|
||||
* @param prevKey - Key to Start from, EMPTY_STRING to begin.
|
||||
* @param count - Number of keys to return.
|
||||
* @return List of Keys that match the criteria.
|
||||
*/
|
||||
List<KeyData> listKey(Pipeline pipeline, String prefix, String prevKey, int
|
||||
count);
|
||||
|
||||
|
||||
/**
|
||||
* Shutdown keyManager.
|
||||
*/
|
||||
void shutdown();
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ public void start() throws Exception {
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
public void stop() throws Exception {
|
||||
public void stop() {
|
||||
if (bossGroup != null) {
|
||||
bossGroup.shutdownGracefully();
|
||||
}
|
||||
|
@ -92,20 +92,4 @@ public void putDB(String containerName, LevelDBStore db) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove an entry from the cache.
|
||||
*
|
||||
* @param containerName - Name of the container.
|
||||
*/
|
||||
public void removeDB(String containerName) {
|
||||
Preconditions.checkNotNull(containerName);
|
||||
Preconditions.checkState(!containerName.isEmpty());
|
||||
lock.lock();
|
||||
try {
|
||||
this.remove(containerName);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -100,10 +100,48 @@ public void start() throws Exception {
|
||||
|
||||
/**
|
||||
* Stops the ozone container.
|
||||
* @throws Exception
|
||||
*
|
||||
* Shutdown logic is not very obvious from the following code.
|
||||
* if you need to modify the logic, please keep these comments in mind.
|
||||
* Here is the shutdown sequence.
|
||||
*
|
||||
* 1. We shutdown the network ports.
|
||||
*
|
||||
* 2. Now we need to wait for all requests in-flight to finish.
|
||||
*
|
||||
* 3. The container manager lock is a read-write lock with "Fairness" enabled.
|
||||
*
|
||||
* 4. This means that the waiting threads are served in a "first-come-first
|
||||
* -served" manner. Please note that this applies to waiting threads only.
|
||||
*
|
||||
* 5. Since write locks are exclusive, if we are waiting to get a lock it
|
||||
* implies that we are waiting for in-flight operations to complete.
|
||||
*
|
||||
* 6. if there are other write operations waiting on the reader-writer lock,
|
||||
* fairness guarantees that they will proceed before the shutdown lock
|
||||
* request.
|
||||
*
|
||||
* 7. Since all operations either take a reader or writer lock of container
|
||||
* manager, we are guaranteed that we are the last operation since we have
|
||||
* closed the network port, and we wait until close is successful.
|
||||
*
|
||||
* 8. We take the writer lock and call shutdown on each of the managers in
|
||||
* reverse order. That is chunkManager, keyManager and containerManager is
|
||||
* shutdown.
|
||||
*
|
||||
*/
|
||||
public void stop() throws Exception {
|
||||
public void stop() {
|
||||
LOG.info("Attempting to stop container services.");
|
||||
server.stop();
|
||||
try {
|
||||
this.manager.writeLock();
|
||||
this.chunkManager.shutdown();
|
||||
this.keyManager.shutdown();
|
||||
this.manager.shutdown();
|
||||
LOG.info("container services shutdown complete.");
|
||||
} finally {
|
||||
this.manager.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user