HDFS-10756. Expose getTrashRoot to HTTPFS and WebHDFS. Contributed by Yuanbo Liu.

This commit is contained in:
Xiao Chen 2016-11-04 18:06:55 -07:00
parent 95665a6eea
commit d8bab3dcb6
15 changed files with 366 additions and 88 deletions

View File

@ -113,6 +113,7 @@ public abstract class FileSystem extends Configured implements Closeable {
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
public static final String TRASH_PREFIX = ".Trash";
public static final String USER_HOME_PREFIX = "/user";
/** FileSystem cache */
static final Cache CACHE = new Cache();
@ -1965,7 +1966,7 @@ public LocatedFileStatus next() throws IOException {
*/
public Path getHomeDirectory() {
return this.makeQualified(
new Path("/user/"+System.getProperty("user.name")));
new Path(USER_HOME_PREFIX + "/" + System.getProperty("user.name")));
}

View File

@ -64,6 +64,7 @@ public enum OpType {
GET_STATUS(CommonStatisticNames.OP_GET_STATUS),
GET_STORAGE_POLICIES("op_get_storage_policies"),
GET_STORAGE_POLICY("op_get_storage_policy"),
GET_TRASH_ROOT("op_get_trash_root"),
GET_XATTR("op_get_xattr"),
LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS),
LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS),

View File

@ -1621,6 +1621,27 @@ BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
}.run();
}
@Override
public Path getTrashRoot(Path path) {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_TRASH_ROOT);
final HttpOpParam.Op op = GetOpParam.Op.GETTRASHROOT;
try {
String strTrashPath = new FsPathResponseRunner<String>(op, path) {
@Override
String decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.getPath(json);
}
}.run();
return new Path(strTrashPath).makeQualified(getUri(), null);
} catch(IOException e) {
LOG.warn("Cannot find trash root of " + path, e);
// keep the same behavior with dfs
return super.getTrashRoot(path).makeQualified(getUri(), null);
}
}
@Override
public void access(final Path path, final FsAction mode) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;

View File

@ -37,6 +37,7 @@ public enum Op implements HttpOpParam.Op {
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
GETXATTRS(false, HttpURLConnection.HTTP_OK),
GETTRASHROOT(false, HttpURLConnection.HTTP_OK),
LISTXATTRS(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED),

View File

@ -128,6 +128,8 @@ public class HttpFSFileSystem extends FileSystem
public static final String HOME_DIR_JSON = "Path";
public static final String TRASH_DIR_JSON = "Path";
public static final String SET_REPLICATION_JSON = "boolean";
public static final String UPLOAD_CONTENT_TYPE= "application/octet-stream";
@ -203,7 +205,7 @@ public static enum Operation {
OPEN(HTTP_GET), GETFILESTATUS(HTTP_GET), LISTSTATUS(HTTP_GET),
GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET),
GETFILECHECKSUM(HTTP_GET), GETFILEBLOCKLOCATIONS(HTTP_GET),
INSTRUMENTATION(HTTP_GET), GETACLSTATUS(HTTP_GET),
INSTRUMENTATION(HTTP_GET), GETACLSTATUS(HTTP_GET), GETTRASHROOT(HTTP_GET),
APPEND(HTTP_POST), CONCAT(HTTP_POST), TRUNCATE(HTTP_POST),
CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT),
SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT),
@ -818,6 +820,33 @@ public Path getHomeDirectory() {
}
}
/**
* Get the root directory of Trash for a path in HDFS.
* 1. File in encryption zone returns /ez1/.Trash/username.
* 2. File not in encryption zone, or encountered exception when checking
* the encryption zone of the path, returns /users/username/.Trash.
* Caller appends either Current or checkpoint timestamp
* for trash destination.
* The default implementation returns "/user/username/.Trash".
* @param fullPath the trash root of the path to be determined.
* @return trash root
*/
@Override
public Path getTrashRoot(Path fullPath) {
Map<String, String> params = new HashMap<>();
params.put(OP_PARAM, Operation.GETTRASHROOT.toString());
try {
HttpURLConnection conn = getConnection(
Operation.GETTRASHROOT.getMethod(), params, fullPath, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return new Path((String) json.get(TRASH_DIR_JSON));
} catch (IOException ex) {
LOG.warn("Cannot find trash root of " + fullPath, ex);
return super.getTrashRoot(fullPath);
}
}
/**
* Set owner of a path (i.e. a file or a directory).
* The parameters username and groupname cannot both be null.

View File

@ -1056,6 +1056,29 @@ public Void execute(FileSystem fs) throws IOException {
}
/**
* Executor that performs getting trash root FileSystemAccess
* files system operation.
*/
@InterfaceAudience.Private
public static class FSTrashRoot
implements FileSystemAccess.FileSystemExecutor<JSONObject> {
private Path path;
public FSTrashRoot(String path) {
this.path = new Path(path);
}
@Override
@SuppressWarnings("unchecked")
public JSONObject execute(FileSystem fs) throws IOException {
Path trashRoot = fs.getTrashRoot(this.path);
JSONObject json = new JSONObject();
json.put(HttpFSFileSystem.TRASH_DIR_JSON, trashRoot.toUri().getPath());
return json;
}
}
/**
* Executor that gets the ACL information for a given file.
*/

View File

@ -60,6 +60,7 @@ public class HttpFSParametersProvider extends ParametersProvider {
PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{});
PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, new Class[]{});
PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{});
PARAMS_DEF.put(Operation.GETTRASHROOT, new Class[]{});
PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{});
PARAMS_DEF.put(Operation.APPEND, new Class[]{DataParam.class});
PARAMS_DEF.put(Operation.CONCAT, new Class[]{SourcesParam.class});

View File

@ -339,6 +339,13 @@ public InputStream run() throws Exception {
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETTRASHROOT: {
FSOperations.FSTrashRoot command = new FSOperations.FSTrashRoot(path);
JSONObject json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));

View File

@ -40,6 +40,8 @@ HttpFS HTTP web-service API calls are HTTP REST calls that map to a HDFS file sy
* `$ curl http://httpfs-host:14000/webhdfs/v1/user/foo?op=list` returns the contents of the HDFS `/user/foo` directory in JSON format.
* `$ curl http://httpfs-host:14000/webhdfs/v1/user/foo?op=GETTRASHROOT` returns the path `/user/foo/.Trash`, if `/` is an encrypted zone, returns the path `/.Trash/foo`. See [more details](../hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Rename_and_Trash_considerations) about trash path in an encrypted zone.
* `$ curl -X POST http://httpfs-host:14000/webhdfs/v1/user/foo/bar?op=mkdirs` creates the HDFS `/user/foo.bar` directory.
User and Developer Documentation

View File

@ -430,6 +430,35 @@ private void testWorkingdirectory() throws Exception {
new Path("/tmp").toUri().getPath());
}
private void testTrashRoot() throws Exception {
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
final Path rootDir = new Path("/");
final Path fooPath = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(fooPath);
os.write(1);
os.close();
Path trashPath = fs.getTrashRoot(rootDir);
Path fooTrashPath = fs.getTrashRoot(fooPath);
fs.close();
fs = getHttpFSFileSystem();
Path httpFSTrashPath = fs.getTrashRoot(rootDir);
Path httpFSFooTrashPath = fs.getTrashRoot(fooPath);
fs.close();
assertEquals(trashPath.toUri().getPath(),
httpFSTrashPath.toUri().getPath());
assertEquals(fooTrashPath.toUri().getPath(),
httpFSFooTrashPath.toUri().getPath());
// trash path is related to USER, not path
assertEquals(trashPath.toUri().getPath(),
fooTrashPath.toUri().getPath());
}
}
private void testMkdirs() throws Exception {
Path path = new Path(getProxiedFSTestDir(), "foo");
FileSystem fs = getHttpFSFileSystem();
@ -916,7 +945,8 @@ protected enum Operation {
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH,
GETTRASHROOT
}
private void operation(Operation op) throws Exception {
@ -996,6 +1026,9 @@ private void operation(Operation op) throws Exception {
case LIST_STATUS_BATCH:
testListStatusBatch();
break;
case GETTRASHROOT:
testTrashRoot();
break;
}
}

View File

@ -43,7 +43,6 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrCodec;
@ -137,9 +136,9 @@ private void createHttpFSServer(boolean addDelegationTokenAuthHandler)
//HDFS configuration
File hadoopConfDir = new File(new File(homeDir, "conf"), "hadoop-conf");
hadoopConfDir.mkdirs();
String fsDefaultName = TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
Configuration hdfsConf = TestHdfsHelper.getHdfsConf();
// Http Server's conf should be based on HDFS's conf
Configuration conf = new Configuration(hdfsConf);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
@ -346,6 +345,20 @@ private String getPerms ( String statusJson ) throws Exception {
return (String) details.get("permission");
}
/**
* Given the JSON output from the GETTRASHPATH call, return the
* 'path' value.
*
* @param statusJson JSON from GETTRASHPATH
* @return The value of 'path' in statusJson
* @throws Exception
*/
private String getPath(String statusJson) throws Exception {
JSONParser parser = new JSONParser();
JSONObject details = (JSONObject) parser.parse(statusJson);
return (String) details.get("Path");
}
/**
* Given the JSON output from the GETACLSTATUS call, return the
* 'entries' value as a List<String>.
@ -671,6 +684,40 @@ public void testPutNoOperation() throws Exception {
Assert.assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_BAD_REQUEST);
}
@Test
@TestDir
@TestJetty
@TestHdfs
public void testGetTrashRoot() throws Exception {
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
createHttpFSServer(false);
String trashJson = getStatus("/", "GETTRASHROOT");
String trashPath = getPath(trashJson);
Path expectedPath = new Path(FileSystem.USER_HOME_PREFIX,
new Path(user, FileSystem.TRASH_PREFIX));
Assert.assertEquals(expectedPath.toUri().getPath(), trashPath);
byte[] array = new byte[]{0, 1, 2, 3};
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs.mkdirs(new Path("/tmp"));
OutputStream os = fs.create(new Path("/tmp/foo"));
os.write(array);
os.close();
trashJson = getStatus("/tmp/foo", "GETTRASHROOT");
trashPath = getPath(trashJson);
Assert.assertEquals(expectedPath.toUri().getPath(), trashPath);
//TestHdfsHelp has already set up EZ environment
final Path ezFile = TestHdfsHelper.ENCRYPTED_FILE;
final Path ezPath = TestHdfsHelper.ENCRYPTION_ZONE;
trashJson = getStatus(ezFile.toUri().getPath(), "GETTRASHROOT");
trashPath = getPath(trashJson);
expectedPath = new Path(ezPath, new Path(FileSystem.TRASH_PREFIX, user));
Assert.assertEquals(expectedPath.toUri().getPath(), trashPath);
}
@Test
@TestDir
@TestJetty
@ -754,6 +801,21 @@ public void testDelegationTokenOperations() throws Exception {
conn = (HttpURLConnection) url.openConnection();
Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN,
conn.getResponseCode());
// getTrash test with delegation
url = new URL(TestJettyHelper.getJettyURL(),
"/webhdfs/v1/?op=GETTRASHROOT&delegation=" + tokenStr);
conn = (HttpURLConnection) url.openConnection();
Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN,
conn.getResponseCode());
url = new URL(TestJettyHelper.getJettyURL(),
"/webhdfs/v1/?op=GETTRASHROOT");
conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty("Cookie",
AuthenticatedURL.AUTH_COOKIE + "=" + tokenSigned);
Assert.assertEquals(HttpURLConnection.HTTP_OK,
conn.getResponseCode());
}
}

View File

@ -1057,6 +1057,11 @@ private Response get(
np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
return Response.ok().build();
}
case GETTRASHROOT: {
final String trashPath = getTrashRoot(fullpath, conf);
final String jsonStr = JsonUtil.toJsonString("Path", trashPath);
return Response.ok(jsonStr).type(MediaType.APPLICATION_JSON).build();
}
case LISTSTATUS_BATCH:
{
byte[] start = HdfsFileStatus.EMPTY_NAME;
@ -1072,6 +1077,13 @@ private Response get(
}
}
private static String getTrashRoot(String fullPath,
Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf != null ? conf : new Configuration());
return fs.getTrashRoot(
new org.apache.hadoop.fs.Path(fullPath)).toUri().getPath();
}
private static DirectoryListing getDirectoryListing(final NamenodeProtocols np,
final String p, byte[] startAfter) throws IOException {
final DirectoryListing listing = np.getListing(p, startAfter, false);

View File

@ -42,6 +42,7 @@ WebHDFS REST API
* [Get Content Summary of a Directory](#Get_Content_Summary_of_a_Directory)
* [Get File Checksum](#Get_File_Checksum)
* [Get Home Directory](#Get_Home_Directory)
* [Get Trash Root](#Get_Trash_Root)
* [Set Permission](#Set_Permission)
* [Set Owner](#Set_Owner)
* [Set Replication Factor](#Set_Replication_Factor)
@ -149,6 +150,7 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`GETFILECHECKSUM`](#Get_File_Checksum) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileChecksum)
* [`GETHOMEDIRECTORY`](#Get_Home_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getHomeDirectory)
* [`GETDELEGATIONTOKEN`](#Get_Delegation_Token) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getDelegationToken)
* [`GETTRASHROOT`](#Get_Trash_Root) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getTrashRoot)
* [`GETXATTRS`](#Get_an_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getXAttr)
* [`GETXATTRS`](#Get_multiple_XAttrs) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getXAttrs)
* [`GETXATTRS`](#Get_all_XAttrs) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getXAttrs)
@ -578,7 +580,7 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileSt
"group" : "supergroup",
"length" : 0,
"modificationTime": 1320895981256,
"owner" : "szetszwo",
"owner" : "username",
"pathSuffix" : "bar",
"permission" : "711",
"replication" : 0,
@ -815,10 +817,36 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileCh
Content-Type: application/json
Transfer-Encoding: chunked
{"Path": "/user/szetszwo"}
{"Path": "/user/username"}
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getHomeDirectory
### Get Trash Root
* Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETTRASHROOT"
The client receives a response with a [`Path` JSON object](#Path_JSON_Schema):
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{"Path": "/user/username/.Trash"}
if the path is an encrypted zone path and user has permission of the path, the client receives a response like this:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{"Path": "/PATH/.Trash/username"}
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getTrashRoot
For more details about trash root in an encrypted zone, please refer to [Transparent Encryption Guide](./TransparentEncryption.html#Rename_and_Trash_considerations).
### Set Permission
* Submit a HTTP PUT request.
@ -1137,7 +1165,7 @@ Snapshot Operations
Content-Type: application/json
Transfer-Encoding: chunked
{"Path": "/user/szetszwo/.snapshot/s1"}
{"Path": "/user/username/.snapshot/s1"}
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).createSnapshot

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.security.AccessControlException;
@ -1558,6 +1559,25 @@ public void testEncryptionZoneWithTrash() throws Exception {
DFSTestUtil.createFile(fs, nestedEZFile, len, (short) 1, 0xFEED);
verifyShellDeleteWithTrash(shell, topEZFile);
verifyShellDeleteWithTrash(shell, nestedEZFile);
//Test nested EZ with webHDFS
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
final String currentUser =
UserGroupInformation.getCurrentUser().getShortUserName();
final Path expectedTopTrash = new Path(topEZ,
new Path(FileSystem.TRASH_PREFIX, currentUser));
final Path expectedNestedTrash = new Path(nestedEZ,
new Path(FileSystem.TRASH_PREFIX, currentUser));
final Path topTrash = webFS.getTrashRoot(topEZFile);
final Path nestedTrash = webFS.getTrashRoot(nestedEZFile);
assertEquals(expectedTopTrash.toUri().getPath(),
topTrash.toUri().getPath());
assertEquals(expectedNestedTrash.toUri().getPath(),
nestedTrash.toUri().getPath());
verifyShellDeleteWithTrash(shell, nestedEZ);
verifyShellDeleteWithTrash(shell, topEZ);
}
@ -1566,6 +1586,8 @@ public void testEncryptionZoneWithTrash() throws Exception {
public void testRootDirEZTrash() throws Exception {
final HdfsAdmin dfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
final String currentUser =
UserGroupInformation.getCurrentUser().getShortUserName();
final Path rootDir = new Path("/");
dfsAdmin.createEncryptionZone(rootDir, TEST_KEY, NO_TRASH);
final Path encFile = new Path("/encFile");
@ -1578,10 +1600,23 @@ public void testRootDirEZTrash() throws Exception {
// Trash path should be consistent
// if root path is an encryption zone
Path encFileTrash = shell.getCurrentTrashDir(encFile);
Path rootDirTrash = shell.getCurrentTrashDir(rootDir);
Path encFileCurrentTrash = shell.getCurrentTrashDir(encFile);
Path rootDirCurrentTrash = shell.getCurrentTrashDir(rootDir);
assertEquals("Root trash should be equal with ezFile trash",
encFileTrash, rootDirTrash);
encFileCurrentTrash, rootDirCurrentTrash);
// Use webHDFS client to test trash root path
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
final Path expectedTrash = new Path(rootDir,
new Path(FileSystem.TRASH_PREFIX, currentUser));
Path webHDFSTrash = webFS.getTrashRoot(encFile);
assertEquals(expectedTrash.toUri().getPath(),
webHDFSTrash.toUri().getPath());
assertEquals(encFileCurrentTrash.getParent().toUri().getPath(),
webHDFSTrash.toUri().getPath());
}
@Test(timeout = 120000)

View File

@ -1072,4 +1072,26 @@ public void testWebHdfsNoRedirect() throws Exception {
}
}
}
@Test
public void testGetTrashRoot() throws Exception {
MiniDFSCluster cluster = null;
final Configuration conf = WebHdfsTestUtil.createConf();
final String currentUser =
UserGroupInformation.getCurrentUser().getShortUserName();
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
Path trashPath = webFS.getTrashRoot(new Path("/"));
Path expectedPath = new Path(FileSystem.USER_HOME_PREFIX,
new Path(currentUser, FileSystem.TRASH_PREFIX));
assertEquals(expectedPath.toUri().getPath(), trashPath.toUri().getPath());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}