diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
index 4f11ad6b51..832e98c6e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
@@ -124,6 +124,8 @@
server/application_history_server.proto
client_SCM_protocol.proto
server/SCM_Admin_protocol.proto
+ yarn_csi_adaptor.proto
+ YarnCsiAdaptor.proto
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPB.java
similarity index 62%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPB.java
index 043e7ae8de..2a7a114693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPB.java
@@ -15,22 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+package org.apache.hadoop.yarn.api;
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtocol;
/**
- * Client talks to CSI adaptor.
+ * Interface for the CSI adaptor protocol.
*/
-public class CsiAdaptorClient implements CsiAdaptorClientProtocol {
-
- @Override
- public void validateVolume() throws VolumeException {
- // TODO
- }
-
- @Override public void controllerPublishVolume() throws VolumeException {
- // TODO
- }
+@ProtocolInfo(
+ protocolName = "CsiAdaptorPB",
+ protocolVersion = 1)
+public interface CsiAdaptorPB extends
+ CsiAdaptorProtocol.CsiAdaptorProtocolService.BlockingInterface {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java
new file mode 100644
index 0000000000..0822163dc8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * CSI adaptor delegates all the calls from YARN to a CSI driver.
+ */
+public interface CsiAdaptorProtocol {
+
+ GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request)
+ throws YarnException, IOException;
+
+ ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+ ValidateVolumeCapabilitiesRequest request) throws YarnException,
+ IOException;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoRequest.java
similarity index 63%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoRequest.java
index b894d4e046..a32c93c3bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoRequest.java
@@ -15,20 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.volume.csi;
+package org.apache.hadoop.yarn.api.protocolrecords;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import org.apache.hadoop.yarn.util.Records;
/**
- * Protocol for the CSI adaptor.
+ * Get plugin info request.
*/
-@Private
-@Unstable
-public interface CsiAdaptorClientProtocol {
+public abstract class GetPluginInfoRequest {
- void validateVolume() throws VolumeException;
-
- void controllerPublishVolume() throws VolumeException;
+ public static GetPluginInfoRequest newInstance() {
+ return Records.newRecord(GetPluginInfoRequest.class);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoResponse.java
new file mode 100644
index 0000000000..95772009e1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoResponse.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Get plugin info response.
+ */
+public abstract class GetPluginInfoResponse {
+
+ public static GetPluginInfoResponse newInstance(
+ String driverName, String version) {
+ GetPluginInfoResponse response =
+ Records.newRecord(GetPluginInfoResponse.class);
+ response.setDriverName(driverName);
+ response.setVersion(version);
+ return response;
+ }
+
+ public abstract void setDriverName(String driverName);
+
+ public abstract String getDriverName();
+
+ public abstract void setVersion(String version);
+
+ public abstract String getVersion();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesRequest.java
new file mode 100644
index 0000000000..0ab1b1f222
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesRequest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * YARN internal message used to validate volume capabilities
+ * with a CSI driver controller plugin.
+ */
+public abstract class ValidateVolumeCapabilitiesRequest {
+
+ /**
+ * Volume access mode.
+ */
+ public enum AccessMode {
+ UNKNOWN,
+ SINGLE_NODE_WRITER,
+ SINGLE_NODE_READER_ONLY,
+ MULTI_NODE_READER_ONLY,
+ MULTI_NODE_SINGLE_WRITER,
+ MULTI_NODE_MULTI_WRITER,
+ }
+
+ /**
+ * Volume type.
+ */
+ public enum VolumeType {
+ BLOCK,
+ FILE_SYSTEM
+ }
+
+ /**
+ * Volume capability.
+ */
+ public static class VolumeCapability {
+
+ private AccessMode mode;
+ private VolumeType type;
+ private List flags;
+
+ public VolumeCapability(AccessMode accessMode, VolumeType volumeType,
+ List mountFlags) {
+ this.mode = accessMode;
+ this.type = volumeType;
+ this.flags = mountFlags;
+ }
+
+ public AccessMode getAccessMode() {
+ return mode;
+ }
+
+ public VolumeType getVolumeType() {
+ return type;
+ }
+
+ public List getMountFlags() {
+ return flags;
+ }
+ }
+
+ public static ValidateVolumeCapabilitiesRequest newInstance(
+ String volumeId, List volumeCapabilities,
+ Map volumeAttributes) {
+ ValidateVolumeCapabilitiesRequest
+ request =
+ Records.newRecord(
+ ValidateVolumeCapabilitiesRequest.class);
+ request.setVolumeId(volumeId);
+ request.setVolumeAttributes(volumeAttributes);
+ for (VolumeCapability capability : volumeCapabilities) {
+ request.addVolumeCapability(capability);
+ }
+ return request;
+ }
+
+ public static ValidateVolumeCapabilitiesRequest newInstance(
+ String volumeId, Map volumeAttributes) {
+ ValidateVolumeCapabilitiesRequest
+ request =
+ Records.newRecord(
+ ValidateVolumeCapabilitiesRequest.class);
+ request.setVolumeId(volumeId);
+ request.setVolumeAttributes(volumeAttributes);
+ return request;
+ }
+
+ public abstract void setVolumeId(String volumeId);
+
+ public abstract String getVolumeId();
+
+ public abstract void setVolumeAttributes(Map attributes);
+
+ public abstract Map getVolumeAttributes();
+
+ public abstract void addVolumeCapability(VolumeCapability volumeCapability);
+
+ public abstract List getVolumeCapabilities();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesResponse.java
new file mode 100644
index 0000000000..a72ba0ae46
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesResponse.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * YARN internal message used to represent the response of
+ * volume capabilities validation with a CSI driver controller plugin.
+ */
+public abstract class ValidateVolumeCapabilitiesResponse {
+
+ public static ValidateVolumeCapabilitiesResponse newInstance(
+ boolean supported, String responseMessage) {
+ ValidateVolumeCapabilitiesResponse
+ record =
+ Records.newRecord(
+ ValidateVolumeCapabilitiesResponse.class);
+ record.setResponseMessage(responseMessage);
+ record.setSupported(supported);
+ return record;
+ }
+
+ public abstract void setSupported(boolean supported);
+
+ public abstract boolean isSupported();
+
+ public abstract void setResponseMessage(String responseMessage);
+
+ public abstract String getResponseMessage();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index e88d5940a6..2f2528445d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3427,6 +3427,18 @@ public class YarnConfiguration extends Configuration {
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
false;
+ ////////////////////////////////
+ // CSI Volume configs
+ ////////////////////////////////
+ /**
+ * One or more socket addresses for csi-adaptor.
+ * Multiple addresses are delimited by ",".
+ */
+ public static final String NM_CSI_ADAPTOR_PREFIX =
+ NM_PREFIX + "csi-driver-adaptor.";
+ public static final String NM_CSI_ADAPTOR_ADDRESSES =
+ NM_CSI_ADAPTOR_PREFIX + "addresses";
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
new file mode 100644
index 0000000000..9dcb8a7355
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "CsiAdaptorProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_csi_adaptor.proto";
+
+service CsiAdaptorProtocolService {
+
+ rpc getPluginInfo (GetPluginInfoRequest)
+ returns (GetPluginInfoResponse);
+
+ rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
+ returns (ValidateVolumeCapabilitiesResponse);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto
new file mode 100644
index 0000000000..c9adbea783
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "CsiAdaptorProtos";
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_protos.proto";
+
+message ValidateVolumeCapabilitiesRequest {
+ required string volume_id = 1;
+ repeated VolumeCapability volume_capabilities = 2;
+ repeated StringStringMapProto volume_attributes = 3;
+}
+
+message ValidateVolumeCapabilitiesResponse {
+ // True if the Plugin supports the specified capabilities for the
+ // given volume. This field is REQUIRED.
+ required bool supported = 1;
+
+ // Message to the CO if `supported` above is false. This field is
+ // OPTIONAL.
+ // An empty string is equal to an unspecified field value.
+ optional string message = 2;
+}
+
+message VolumeCapability {
+ enum VolumeType {
+ BLOCK = 0;
+ FILE_SYSTEM = 1;
+ }
+
+ enum AccessMode {
+ UNKNOWN = 0;
+ SINGLE_NODE_WRITER = 1;
+ SINGLE_NODE_READER_ONLY = 2;
+ MULTI_NODE_READER_ONLY = 3;
+ MULTI_NODE_SINGLE_WRITER = 4;
+ MULTI_NODE_MULTI_WRITER = 5;
+ }
+
+ required VolumeType volume_type = 1;
+ required AccessMode access_mode = 2;
+ repeated string mount_flags = 3;
+}
+
+message GetPluginInfoRequest {
+ // Intentionally empty.
+}
+
+message GetPluginInfoResponse {
+ required string name = 1;
+ required string vendor_version = 2;
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java
new file mode 100644
index 0000000000..2e10f72046
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.impl.pb.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.CsiAdaptorPB;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * CSI adaptor client implementation.
+ */
+public class CsiAdaptorProtocolPBClientImpl
+ implements CsiAdaptorProtocol, Closeable {
+
+ private final CsiAdaptorPB proxy;
+
+ public CsiAdaptorProtocolPBClientImpl(long clientVersion,
+ InetSocketAddress addr, Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, CsiAdaptorPB.class, ProtobufRpcEngine.class);
+ this.proxy = RPC.getProxy(CsiAdaptorPB.class, clientVersion, addr, conf);
+ }
+
+ @Override
+ public GetPluginInfoResponse getPluginInfo(
+ GetPluginInfoRequest request) throws YarnException, IOException {
+ CsiAdaptorProtos.GetPluginInfoRequest requestProto =
+ ((GetPluginInfoRequestPBImpl) request).getProto();
+ try {
+ return new GetPluginInfoResponsePBImpl(
+ proxy.getPluginInfo(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+ ValidateVolumeCapabilitiesRequest request)
+ throws YarnException, IOException {
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest requestProto =
+ ((ValidateVolumeCapabilitiesRequestPBImpl) request).getProto();
+ try {
+ return new ValidateVolumeCapabilitiesResponsePBImpl(
+ proxy.validateVolumeCapacity(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(this.proxy != null) {
+ RPC.stopProxy(this.proxy);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java
new file mode 100644
index 0000000000..9a194351e5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.impl.pb.service;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.yarn.api.CsiAdaptorPB;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+import java.io.IOException;
+
+/**
+ * CSI adaptor server side implementation, this is hosted on a node manager.
+ */
+public class CsiAdaptorProtocolPBServiceImpl implements CsiAdaptorPB {
+
+ private final CsiAdaptorProtocol real;
+ public CsiAdaptorProtocolPBServiceImpl(CsiAdaptorProtocol impl) {
+ this.real = impl;
+ }
+
+ @Override
+ public CsiAdaptorProtos.GetPluginInfoResponse getPluginInfo(
+ RpcController controller, CsiAdaptorProtos.GetPluginInfoRequest request)
+ throws ServiceException {
+ try {
+ GetPluginInfoRequest req =
+ new GetPluginInfoRequestPBImpl(request);
+ GetPluginInfoResponse response = real.getPluginInfo(req);
+ return ((GetPluginInfoResponsePBImpl) response).getProto();
+ } catch (YarnException | IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse
+ validateVolumeCapacity(RpcController controller,
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest request)
+ throws ServiceException {
+ try {
+ ValidateVolumeCapabilitiesRequestPBImpl req =
+ new ValidateVolumeCapabilitiesRequestPBImpl(request);
+ ValidateVolumeCapabilitiesResponse response =
+ real.validateVolumeCapacity(req);
+ return ((ValidateVolumeCapabilitiesResponsePBImpl) response).getProto();
+ } catch (YarnException | IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoRequestPBImpl.java
new file mode 100644
index 0000000000..0d1c2e52f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoRequestPBImpl.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * Get plugin info request protobuf impl.
+ */
+public class GetPluginInfoRequestPBImpl extends GetPluginInfoRequest {
+
+ private CsiAdaptorProtos.GetPluginInfoRequest.Builder builder;
+
+ public GetPluginInfoRequestPBImpl(
+ CsiAdaptorProtos.GetPluginInfoRequest requestProto) {
+ this.builder = requestProto.toBuilder();
+ }
+
+ public GetPluginInfoRequestPBImpl() {
+ this.builder = CsiAdaptorProtos.GetPluginInfoRequest.newBuilder();
+ }
+
+ public CsiAdaptorProtos.GetPluginInfoRequest getProto() {
+ Preconditions.checkNotNull(builder);
+ return builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoResponsePBImpl.java
new file mode 100644
index 0000000000..141fd6d5c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoResponsePBImpl.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * Get plugin info response protobuf impl.
+ */
+public class GetPluginInfoResponsePBImpl extends GetPluginInfoResponse {
+
+ private CsiAdaptorProtos.GetPluginInfoResponse.Builder builder;
+
+ public GetPluginInfoResponsePBImpl(
+ CsiAdaptorProtos.GetPluginInfoResponse responseProto) {
+ this.builder = responseProto.toBuilder();
+ }
+
+ public GetPluginInfoResponsePBImpl() {
+ this.builder = CsiAdaptorProtos.GetPluginInfoResponse.newBuilder();
+ }
+
+ @Override
+ public void setDriverName(String driverName) {
+ Preconditions.checkNotNull(builder);
+ builder.setName(driverName);
+ }
+
+ @Override
+ public String getDriverName() {
+ Preconditions.checkNotNull(builder);
+ return builder.getName();
+ }
+
+ @Override
+ public void setVersion(String version) {
+ Preconditions.checkNotNull(builder);
+ builder.setVendorVersion(version);
+ }
+
+ @Override
+ public String getVersion() {
+ Preconditions.checkNotNull(builder);
+ return builder.getVendorVersion();
+ }
+
+ public CsiAdaptorProtos.GetPluginInfoResponse getProto() {
+ Preconditions.checkNotNull(builder);
+ return builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesRequestPBImpl.java
new file mode 100644
index 0000000000..14bd89dc46
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesRequestPBImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PB wrapper for CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.
+ */
+public class ValidateVolumeCapabilitiesRequestPBImpl extends
+ ValidateVolumeCapabilitiesRequest {
+
+ private CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.Builder builder;
+
+ public ValidateVolumeCapabilitiesRequestPBImpl(
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest proto) {
+ this.builder = proto.toBuilder();
+ }
+
+ public ValidateVolumeCapabilitiesRequestPBImpl() {
+ this.builder = CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest
+ .newBuilder();
+ }
+
+ @Override
+ public String getVolumeId() {
+ Preconditions.checkNotNull(builder);
+ return builder.getVolumeId();
+ }
+
+ @Override
+ public void setVolumeAttributes(Map attributes) {
+ Preconditions.checkNotNull(builder);
+ builder.addAllVolumeAttributes(ProtoUtils.convertToProtoFormat(attributes));
+ }
+
+ @Override
+ public void setVolumeId(String volumeId) {
+ Preconditions.checkNotNull(builder);
+ builder.setVolumeId(volumeId);
+ }
+
+ @Override
+ public void addVolumeCapability(VolumeCapability volumeCapability) {
+ Preconditions.checkNotNull(builder);
+ CsiAdaptorProtos.VolumeCapability vc =
+ CsiAdaptorProtos.VolumeCapability.newBuilder()
+ .setAccessMode(CsiAdaptorProtos.VolumeCapability.AccessMode
+ .valueOf(volumeCapability.getAccessMode().ordinal()))
+ .setVolumeType(CsiAdaptorProtos.VolumeCapability.VolumeType
+ .valueOf(volumeCapability.getVolumeType().ordinal()))
+ .addAllMountFlags(volumeCapability.getMountFlags())
+ .build();
+ builder.addVolumeCapabilities(vc);
+ }
+
+ @Override
+ public List getVolumeCapabilities() {
+ Preconditions.checkNotNull(builder);
+ List caps = new ArrayList<>(
+ builder.getVolumeCapabilitiesCount());
+ builder.getVolumeCapabilitiesList().forEach(capability -> {
+ VolumeCapability vc = new VolumeCapability(
+ AccessMode.valueOf(capability.getAccessMode().name()),
+ VolumeType.valueOf(capability.getVolumeType().name()),
+ capability.getMountFlagsList());
+ caps.add(vc);
+ });
+ return caps;
+ }
+
+ @Override
+ public Map getVolumeAttributes() {
+ Preconditions.checkNotNull(builder);
+ return ProtoUtils.convertStringStringMapProtoListToMap(
+ builder.getVolumeAttributesList());
+ }
+
+ public CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest getProto() {
+ Preconditions.checkNotNull(builder);
+ return builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesResponsePBImpl.java
new file mode 100644
index 0000000000..aa33ab7529
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesResponsePBImpl.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * PB wrapper for CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.
+ */
+public class ValidateVolumeCapabilitiesResponsePBImpl
+ extends ValidateVolumeCapabilitiesResponse {
+
+ private CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.Builder builder;
+
+ public ValidateVolumeCapabilitiesResponsePBImpl() {
+ this.builder = CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse
+ .newBuilder();
+ }
+
+ public ValidateVolumeCapabilitiesResponsePBImpl(
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse response) {
+ this.builder = response.toBuilder();
+ }
+
+ @Override
+ public void setSupported(boolean supported) {
+ Preconditions.checkNotNull(builder);
+ this.builder.setSupported(supported);
+ }
+
+ @Override
+ public boolean isSupported() {
+ Preconditions.checkNotNull(builder);
+ return builder.getSupported();
+ }
+
+ @Override
+ public void setResponseMessage(String message) {
+ Preconditions.checkNotNull(builder);
+ this.builder.setMessage(message);
+ }
+
+ @Override
+ public String getResponseMessage() {
+ Preconditions.checkNotNull(builder);
+ return this.builder.getMessage();
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ public CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse getProto() {
+ Preconditions.checkNotNull(builder);
+ return builder.build();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f5493bc540..e7a0e1406c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4037,4 +4037,17 @@
yarn.node-attribute.fs-store.impl.class
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore
+
+
+
+
+ CSI driver adaptor addresses on a node manager.
+ This configuration will be loaded by the resource manager to initiate
+ a client for each adaptor in order to communicate with CSI drivers.
+ Note, these addresses should be mapped to the adaptor addresses which
+ runs the controller plugin.
+
+ yarn.nodemanager.csi-driver-adaptor.addresses
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
index 41f5098596..27d8452d90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
@@ -83,6 +83,18 @@
test-jar
test
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+
+
+ org.apache.hadoop
+ hadoop-yarn-api
+
javax.annotation
javax.annotation-api
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
new file mode 100644
index 0000000000..f94275f40e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import com.google.common.annotations.VisibleForTesting;
+import csi.v0.Csi;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.csi.client.CsiClient;
+import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
+import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
+import org.apache.hadoop.yarn.csi.utils.ConfigUtils;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * This is a Hadoop RPC server, we uses the Hadoop RPC framework here
+ * because we need to stick to the security model current Hadoop supports.
+ */
+public class CsiAdaptorProtocolService extends AbstractService
+ implements CsiAdaptorProtocol {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CsiAdaptorProtocolService.class);
+
+ private Server server;
+ private InetSocketAddress adaptorServiceAddress;
+ private CsiClient csiClient;
+ private String csiDriverName;
+
+ public CsiAdaptorProtocolService(String driverName,
+ String domainSocketPath) {
+ super(CsiAdaptorProtocolService.class.getName());
+ this.csiClient = new CsiClientImpl(domainSocketPath);
+ this.csiDriverName = driverName;
+ }
+
+ @VisibleForTesting
+ public void setCsiClient(CsiClient client) {
+ this.csiClient = client;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ adaptorServiceAddress = ConfigUtils
+ .getCsiAdaptorAddressForDriver(csiDriverName, conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ Configuration conf = getConfig();
+ YarnRPC rpc = YarnRPC.create(conf);
+ this.server = rpc.getServer(
+ CsiAdaptorProtocol.class,
+ this, adaptorServiceAddress, conf, null, 1);
+ this.server.start();
+ LOG.info("{} started, listening on address: {}",
+ CsiAdaptorProtocolService.class.getName(),
+ adaptorServiceAddress.toString());
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.server != null) {
+ this.server.stop();
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public GetPluginInfoResponse getPluginInfo(
+ GetPluginInfoRequest request) throws YarnException, IOException {
+ Csi.GetPluginInfoResponse response = csiClient.getPluginInfo();
+ return ProtoTranslatorFactory.getTranslator(
+ GetPluginInfoResponse.class, Csi.GetPluginInfoResponse.class)
+ .convertFrom(response);
+ }
+
+ @Override
+ public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+ ValidateVolumeCapabilitiesRequest request) throws YarnException,
+ IOException {
+ Csi.ValidateVolumeCapabilitiesRequest req = ProtoTranslatorFactory
+ .getTranslator(ValidateVolumeCapabilitiesRequest.class,
+ Csi.ValidateVolumeCapabilitiesRequest.class)
+ .convertTo(request);
+ Csi.ValidateVolumeCapabilitiesResponse response =
+ csiClient.validateVolumeCapabilities(req);
+ return ProtoTranslatorFactory.getTranslator(
+ ValidateVolumeCapabilitiesResponse.class,
+ Csi.ValidateVolumeCapabilitiesResponse.class)
+ .convertFrom(response);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
new file mode 100644
index 0000000000..919aab587a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains CSI adaptor classes.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
index 5bb9ce9441..d31c0c9b75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.csi.client;
+import csi.v0.Csi;
import csi.v0.Csi.GetPluginInfoResponse;
import java.io.IOException;
@@ -36,4 +37,7 @@ public interface CsiClient {
* @throws IOException when unable to get plugin info from the driver.
*/
GetPluginInfoResponse getPluginInfo() throws IOException;
+
+ Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+ Csi.ValidateVolumeCapabilitiesRequest request) throws IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
index 58dd292d94..5b3d2e23c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.csi.client;
+import csi.v0.Csi;
import csi.v0.Csi.GetPluginInfoRequest;
import csi.v0.Csi.GetPluginInfoResponse;
import org.apache.hadoop.yarn.csi.utils.GrpcHelper;
@@ -48,4 +49,14 @@ public class CsiClientImpl implements CsiClient {
return client.createIdentityBlockingStub().getPluginInfo(request);
}
}
+
+ @Override
+ public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+ Csi.ValidateVolumeCapabilitiesRequest request) throws IOException {
+ try (CsiGrpcClient client = CsiGrpcClient.newBuilder()
+ .setDomainSocketAddress(address).build()) {
+ return client.createControllerBlockingStub()
+ .validateVolumeCapabilities(request);
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
new file mode 100644
index 0000000000..c4f042ebc1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Protobuf message translator for GetPluginInfoResponse and
+ * Csi.GetPluginInfoResponse.
+ */
+public class GetPluginInfoResponseProtoTranslator implements
+ ProtoTranslator {
+
+ @Override public Csi.GetPluginInfoResponse convertTo(
+ GetPluginInfoResponse messageA) throws YarnException {
+ return Csi.GetPluginInfoResponse.newBuilder()
+ .setName(messageA.getDriverName())
+ .setVendorVersion(messageA.getVersion())
+ .build();
+ }
+
+ @Override public GetPluginInfoResponse convertFrom(
+ Csi.GetPluginInfoResponse messageB) throws YarnException {
+ return GetPluginInfoResponse.newInstance(messageB.getName(),
+ messageB.getVendorVersion());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslator.java
new file mode 100644
index 0000000000..93cb44128d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * ProtoTranslator converts a YARN side message to CSI proto message
+ * and vice versa. Each CSI proto message should have a corresponding
+ * YARN side message implementation, and a transformer to convert them
+ * one to the other. This layer helps we to hide CSI spec messages
+ * from YARN components.
+ *
+ * @param YARN side internal messages
+ * @param CSI proto messages
+ */
+public interface ProtoTranslator {
+
+ /**
+ * Convert message from type A to type B.
+ * @param messageA
+ * @return messageB
+ * @throws YarnException
+ */
+ B convertTo(A messageA) throws YarnException;
+
+ /**
+ * Convert message from type B to type A.
+ * @param messageB
+ * @return messageA
+ * @throws YarnException
+ */
+ A convertFrom(B messageB) throws YarnException;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java
new file mode 100644
index 0000000000..5eb76ffab5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+
+/**
+ * Factory class to get desired proto transformer instance.
+ */
+public final class ProtoTranslatorFactory {
+
+ private ProtoTranslatorFactory() {
+ // hide constructor for the factory class
+ }
+
+ /**
+ * Get a {@link ProtoTranslator} based on the given input message
+ * types. If the type is not supported, a IllegalArgumentException
+ * will be thrown. When adding more transformers to this factory class,
+ * note each transformer works exactly for one message to another
+ * (and vice versa). For each type of the message, make sure there is
+ * a corresponding unit test added, such as
+ * TestValidateVolumeCapabilitiesRequest.
+ *
+ * @param yarnProto yarn proto message
+ * @param csiProto CSI proto message
+ * @param yarn proto message
+ * @param CSI proto message
+ * @throws IllegalArgumentException
+ * when given types are not supported
+ * @return
+ * a proto message transformer that transforms
+ * YARN internal proto message to CSI
+ */
+ public static ProtoTranslator getTranslator(
+ Class yarnProto, Class csiProto) {
+ if (yarnProto == ValidateVolumeCapabilitiesRequest.class
+ && csiProto == Csi.ValidateVolumeCapabilitiesRequest.class) {
+ return new ValidateVolumeCapabilitiesRequestProtoTranslator();
+ } else if (yarnProto == ValidateVolumeCapabilitiesResponse.class
+ && csiProto == Csi.ValidateVolumeCapabilitiesResponse.class) {
+ return new ValidationVolumeCapabilitiesResponseProtoTranslator();
+ }
+ throw new IllegalArgumentException("A problem is found while processing"
+ + " proto message translating. Unexpected message types,"
+ + " no transformer is found can handle the transformation from type "
+ + yarnProto.getName() + " <-> " + csiProto.getName());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ValidateVolumeCapabilitiesRequestProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ValidateVolumeCapabilitiesRequestProtoTranslator.java
new file mode 100644
index 0000000000..a74c47a9f6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ValidateVolumeCapabilitiesRequestProtoTranslator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Proto message translator for ValidateVolumeCapabilitiesRequest.
+ * @param ValidateVolumeCapabilitiesRequest
+ * @param Csi.ValidateVolumeCapabilitiesRequest
+ */
+public class ValidateVolumeCapabilitiesRequestProtoTranslator
+ implements ProtoTranslator {
+
+ @Override
+ public Csi.ValidateVolumeCapabilitiesRequest convertTo(
+ ValidateVolumeCapabilitiesRequest request) throws YarnException {
+ Csi.ValidateVolumeCapabilitiesRequest.Builder buidler =
+ Csi.ValidateVolumeCapabilitiesRequest.newBuilder();
+ buidler.setVolumeId(request.getVolumeId());
+ if (request.getVolumeCapabilities() != null
+ && request.getVolumeCapabilities().size() > 0) {
+ buidler.putAllVolumeAttributes(request.getVolumeAttributes());
+ }
+ for (VolumeCapability cap :
+ request.getVolumeCapabilities()) {
+ Csi.VolumeCapability.AccessMode accessMode =
+ Csi.VolumeCapability.AccessMode.newBuilder()
+ .setModeValue(cap.getAccessMode().ordinal())
+ .build();
+ Csi.VolumeCapability.MountVolume mountVolume =
+ Csi.VolumeCapability.MountVolume.newBuilder()
+ .addAllMountFlags(cap.getMountFlags())
+ .build();
+ Csi.VolumeCapability capability =
+ Csi.VolumeCapability.newBuilder()
+ .setAccessMode(accessMode)
+ .setMount(mountVolume)
+ .build();
+ buidler.addVolumeCapabilities(capability);
+ }
+ return buidler.build();
+ }
+
+ @Override
+ public ValidateVolumeCapabilitiesRequest convertFrom(
+ Csi.ValidateVolumeCapabilitiesRequest request) throws YarnException {
+ ValidateVolumeCapabilitiesRequest result = ValidateVolumeCapabilitiesRequest
+ .newInstance(request.getVolumeId(), request.getVolumeAttributesMap());
+ for (Csi.VolumeCapability csiCap :
+ request.getVolumeCapabilitiesList()) {
+ ValidateVolumeCapabilitiesRequest.AccessMode mode =
+ ValidateVolumeCapabilitiesRequest.AccessMode
+ .valueOf(csiCap.getAccessMode().getMode().name());
+ if (!csiCap.hasMount()) {
+ throw new YarnException("Invalid request,"
+ + " mount is not found in the request.");
+ }
+ List mountFlags = new ArrayList<>();
+ for (int i=0; i ValidateVolumeCapabilitiesResponse
+ * @param Csi.ValidateVolumeCapabilitiesResponse
+ */
+public class ValidationVolumeCapabilitiesResponseProtoTranslator
+ implements ProtoTranslator {
+
+ @Override
+ public Csi.ValidateVolumeCapabilitiesResponse convertTo(
+ ValidateVolumeCapabilitiesResponse response) throws YarnException {
+ return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
+ .setSupported(response.isSupported())
+ .setMessage(response.getResponseMessage())
+ .build();
+ }
+
+ @Override
+ public ValidateVolumeCapabilitiesResponse convertFrom(
+ Csi.ValidateVolumeCapabilitiesResponse response) throws YarnException {
+ return ValidateVolumeCapabilitiesResponse.newInstance(
+ response.getSupported(), response.getMessage());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/package-info.java
new file mode 100644
index 0000000000..c0964cdf08
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains classes for protocol translation between YARN and CSI.
+ */
+package org.apache.hadoop.yarn.csi.translator;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java
new file mode 100644
index 0000000000..77e6955180
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Utility class to load configurations.
+ */
+public final class ConfigUtils {
+
+ private ConfigUtils() {
+ // Hide constructor for utility class.
+ }
+ /**
+ * Resolve the CSI adaptor address for a CSI driver from configuration.
+ * Expected configuration property name is
+ * yarn.nodemanager.csi-driver-adaptor.${driverName}.address.
+ * @param driverName
+ * @param conf
+ * @return adaptor service address
+ * @throws YarnException
+ */
+ public static InetSocketAddress getCsiAdaptorAddressForDriver(
+ String driverName, Configuration conf) throws YarnException {
+ String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ + driverName + ".address";
+ String errorMessage = "Failed to load CSI adaptor address for driver "
+ + driverName + ", configuration property " + configName
+ + " is not defined or invalid.";
+ try {
+ InetSocketAddress address = conf
+ .getSocketAddr(configName, null, -1);
+ if (address == null) {
+ throw new YarnException(errorMessage);
+ }
+ return address;
+ } catch (IllegalArgumentException e) {
+ throw new YarnException(errorMessage);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
new file mode 100644
index 0000000000..128240d86d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import csi.v0.Csi;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.csi.client.CsiClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER;
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
+/**
+ * UT for {@link CsiAdaptorProtocolService}.
+ */
+public class TestCsiAdaptorService {
+
+ private static File testRoot = null;
+ private static String domainSocket = null;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ testRoot = GenericTestUtils.getTestDir("csi-test");
+ File socketPath = new File(testRoot, "csi.sock");
+ FileUtils.forceMkdirParent(socketPath);
+ domainSocket = "unix://" + socketPath.getAbsolutePath();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (testRoot != null) {
+ FileUtils.deleteDirectory(testRoot);
+ }
+ }
+
+ @Test
+ public void testValidateVolume() throws IOException, YarnException {
+ ServerSocket ss = new ServerSocket(0);
+ ss.close();
+ InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
+ Configuration conf = new Configuration();
+ conf.setSocketAddr(
+ YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
+ address);
+ CsiAdaptorProtocolService service =
+ new CsiAdaptorProtocolService("test-driver", domainSocket);
+
+ // inject a fake CSI client
+ // this client validates if the ValidateVolumeCapabilitiesRequest
+ // is integrity, and then reply a fake response
+ service.setCsiClient(new CsiClient() {
+ @Override
+ public Csi.GetPluginInfoResponse getPluginInfo() {
+ return Csi.GetPluginInfoResponse.newBuilder()
+ .setName("test-plugin")
+ .setVendorVersion("0.1")
+ .build();
+ }
+
+ @Override
+ public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+ Csi.ValidateVolumeCapabilitiesRequest request) {
+ // validate we get all info from the request
+ Assert.assertEquals("volume-id-0000123", request.getVolumeId());
+ Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
+ Assert.assertEquals(Csi.VolumeCapability.AccessMode
+ .newBuilder().setModeValue(5).build(),
+ request.getVolumeCapabilities(0).getAccessMode());
+ Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
+ Assert.assertEquals(2, request.getVolumeCapabilities(0)
+ .getMount().getMountFlagsCount());
+ Assert.assertTrue(request.getVolumeCapabilities(0)
+ .getMount().getMountFlagsList().contains("mountFlag1"));
+ Assert.assertTrue(request.getVolumeCapabilities(0)
+ .getMount().getMountFlagsList().contains("mountFlag2"));
+ Assert.assertEquals(2, request.getVolumeAttributesCount());
+ Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
+ Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+ // return a fake result
+ return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
+ .setSupported(false)
+ .setMessage("this is a test")
+ .build();
+ }
+ });
+
+ service.init(conf);
+ service.start();
+
+ try (CsiAdaptorProtocolPBClientImpl client =
+ new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) {
+ ValidateVolumeCapabilitiesRequest request =
+ ValidateVolumeCapabilitiesRequestPBImpl
+ .newInstance("volume-id-0000123",
+ ImmutableList.of(
+ new ValidateVolumeCapabilitiesRequest
+ .VolumeCapability(
+ MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+ ImmutableList.of("mountFlag1", "mountFlag2"))),
+ ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+ ValidateVolumeCapabilitiesResponse response = client
+ .validateVolumeCapacity(request);
+
+ Assert.assertEquals(false, response.isSupported());
+ Assert.assertEquals("this is a test", response.getResponseMessage());
+ } finally {
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testValidateVolumeWithNMProxy() throws Exception {
+ ServerSocket ss = new ServerSocket(0);
+ ss.close();
+ InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
+ Configuration conf = new Configuration();
+ conf.setSocketAddr(
+ YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
+ address);
+ CsiAdaptorProtocolService service =
+ new CsiAdaptorProtocolService("test-driver", domainSocket);
+
+ // inject a fake CSI client
+ // this client validates if the ValidateVolumeCapabilitiesRequest
+ // is integrity, and then reply a fake response
+ service.setCsiClient(new CsiClient() {
+ @Override
+ public Csi.GetPluginInfoResponse getPluginInfo() {
+ return Csi.GetPluginInfoResponse.newBuilder()
+ .setName("test-plugin")
+ .setVendorVersion("0.1")
+ .build();
+ }
+
+ @Override
+ public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+ Csi.ValidateVolumeCapabilitiesRequest request) {
+ // validate we get all info from the request
+ Assert.assertEquals("volume-id-0000123", request.getVolumeId());
+ Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
+ Assert.assertEquals(Csi.VolumeCapability.AccessMode
+ .newBuilder().setModeValue(5).build(),
+ request.getVolumeCapabilities(0).getAccessMode());
+ Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
+ Assert.assertEquals(2, request.getVolumeCapabilities(0)
+ .getMount().getMountFlagsCount());
+ Assert.assertTrue(request.getVolumeCapabilities(0)
+ .getMount().getMountFlagsList().contains("mountFlag1"));
+ Assert.assertTrue(request.getVolumeCapabilities(0)
+ .getMount().getMountFlagsList().contains("mountFlag2"));
+ Assert.assertEquals(2, request.getVolumeAttributesCount());
+ Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
+ Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+ // return a fake result
+ return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
+ .setSupported(false)
+ .setMessage("this is a test")
+ .build();
+ }
+ });
+
+ service.init(conf);
+ service.start();
+
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ CsiAdaptorProtocol adaptorClient = NMProxy
+ .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+ NetUtils.createSocketAddrForHost("localhost", ss.getLocalPort()));
+ ValidateVolumeCapabilitiesRequest request =
+ ValidateVolumeCapabilitiesRequestPBImpl
+ .newInstance("volume-id-0000123",
+ ImmutableList.of(new ValidateVolumeCapabilitiesRequest
+ .VolumeCapability(
+ MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+ ImmutableList.of("mountFlag1", "mountFlag2"))),
+ ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+ ValidateVolumeCapabilitiesResponse response = adaptorClient
+ .validateVolumeCapacity(request);
+ Assert.assertEquals(false, response.isSupported());
+ Assert.assertEquals("this is a test", response.getResponseMessage());
+
+ service.stop();
+ }
+
+ @Test (expected = ServiceStateException.class)
+ public void testMissingConfiguration() {
+ Configuration conf = new Configuration();
+ CsiAdaptorProtocolService service =
+ new CsiAdaptorProtocolService("test-driver", domainSocket);
+ service.init(conf);
+ }
+
+ @Test (expected = ServiceStateException.class)
+ public void testInvalidServicePort() {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ + "test-driver-0001.address",
+ "0.0.0.0:-100"); // this is an invalid address
+ CsiAdaptorProtocolService service =
+ new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+ service.init(conf);
+ }
+
+ @Test (expected = ServiceStateException.class)
+ public void testInvalidHost() {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+ + "test-driver-0001.address",
+ "192.0.1:8999"); // this is an invalid ip address
+ CsiAdaptorProtocolService service =
+ new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+ service.init(conf);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java
new file mode 100644
index 0000000000..f1734c84b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the integrity of GetPluginInfoRequest and GetPluginInfoResponse.
+ */
+public class TestGetPluginInfoRequestResponse {
+
+ @Test
+ public void testGetPluginInfoRequestPBRecord() {
+ CsiAdaptorProtos.GetPluginInfoRequest requestProto =
+ CsiAdaptorProtos.GetPluginInfoRequest.newBuilder().build();
+ GetPluginInfoRequestPBImpl pbImpl =
+ new GetPluginInfoRequestPBImpl(requestProto);
+ Assert.assertNotNull(pbImpl);
+ Assert.assertEquals(requestProto, pbImpl.getProto());
+ }
+
+ @Test
+ public void testGetPluginInfoResponsePBRecord() {
+ CsiAdaptorProtos.GetPluginInfoResponse responseProto =
+ CsiAdaptorProtos.GetPluginInfoResponse.newBuilder()
+ .setName("test-driver")
+ .setVendorVersion("1.0.1")
+ .build();
+
+ GetPluginInfoResponsePBImpl pbImpl =
+ new GetPluginInfoResponsePBImpl(responseProto);
+ Assert.assertEquals("test-driver", pbImpl.getDriverName());
+ Assert.assertEquals("1.0.1", pbImpl.getVersion());
+ Assert.assertEquals(responseProto, pbImpl.getProto());
+
+ GetPluginInfoResponse pbImpl2 = GetPluginInfoResponsePBImpl
+ .newInstance("test-driver", "1.0.1");
+ Assert.assertEquals("test-driver", pbImpl2.getDriverName());
+ Assert.assertEquals("1.0.1", pbImpl2.getVersion());
+
+ CsiAdaptorProtos.GetPluginInfoResponse proto =
+ ((GetPluginInfoResponsePBImpl) pbImpl2).getProto();
+ Assert.assertEquals("test-driver", proto.getName());
+ Assert.assertEquals("1.0.1", proto.getVendorVersion());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java
new file mode 100644
index 0000000000..303cfc4493
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.AccessMode;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.VolumeType;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER;
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
+/**
+ * UT for message exchanges.
+ */
+public class TestValidateVolumeCapabilityRequest {
+
+ @Test
+ public void testPBRecord() {
+ CsiAdaptorProtos.VolumeCapability vcProto =
+ CsiAdaptorProtos.VolumeCapability.newBuilder()
+ .setAccessMode(AccessMode.MULTI_NODE_MULTI_WRITER)
+ .setVolumeType(VolumeType.FILE_SYSTEM)
+ .addMountFlags("flag0")
+ .addMountFlags("flag1")
+ .build();
+
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest requestProto =
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.newBuilder()
+ .setVolumeId("volume-id-0000001")
+ .addVolumeCapabilities(vcProto)
+ .addVolumeAttributes(YarnProtos.StringStringMapProto
+ .newBuilder().setKey("attr0")
+ .setValue("value0")
+ .build())
+ .addVolumeAttributes(YarnProtos.StringStringMapProto
+ .newBuilder().setKey("attr1")
+ .setValue("value1")
+ .build())
+ .build();
+
+ ValidateVolumeCapabilitiesRequestPBImpl request =
+ new ValidateVolumeCapabilitiesRequestPBImpl(requestProto);
+
+ Assert.assertEquals("volume-id-0000001", request.getVolumeId());
+ Assert.assertEquals(2, request.getVolumeAttributes().size());
+ Assert.assertEquals("value0", request.getVolumeAttributes().get("attr0"));
+ Assert.assertEquals("value1", request.getVolumeAttributes().get("attr1"));
+ Assert.assertEquals(1, request.getVolumeCapabilities().size());
+ VolumeCapability vc =
+ request.getVolumeCapabilities().get(0);
+ Assert.assertEquals(MULTI_NODE_MULTI_WRITER, vc.getAccessMode());
+ Assert.assertEquals(FILE_SYSTEM, vc.getVolumeType());
+ Assert.assertEquals(2, vc.getMountFlags().size());
+
+ Assert.assertEquals(requestProto, request.getProto());
+ }
+
+ @Test
+ public void testNewInstance() {
+ ValidateVolumeCapabilitiesRequest pbImpl =
+ ValidateVolumeCapabilitiesRequestPBImpl
+ .newInstance("volume-id-0000123",
+ ImmutableList.of(
+ new VolumeCapability(
+ MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+ ImmutableList.of("mountFlag1", "mountFlag2"))),
+ ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+ Assert.assertEquals("volume-id-0000123", pbImpl.getVolumeId());
+ Assert.assertEquals(1, pbImpl.getVolumeCapabilities().size());
+ Assert.assertEquals(FILE_SYSTEM,
+ pbImpl.getVolumeCapabilities().get(0).getVolumeType());
+ Assert.assertEquals(MULTI_NODE_MULTI_WRITER,
+ pbImpl.getVolumeCapabilities().get(0).getAccessMode());
+ Assert.assertEquals(2, pbImpl.getVolumeAttributes().size());
+
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest proto =
+ ((ValidateVolumeCapabilitiesRequestPBImpl) pbImpl).getProto();
+ Assert.assertEquals("volume-id-0000123", proto.getVolumeId());
+ Assert.assertEquals(1, proto.getVolumeCapabilitiesCount());
+ Assert.assertEquals(AccessMode.MULTI_NODE_MULTI_WRITER,
+ proto.getVolumeCapabilities(0).getAccessMode());
+ Assert.assertEquals(VolumeType.FILE_SYSTEM,
+ proto.getVolumeCapabilities(0).getVolumeType());
+ Assert.assertEquals(2, proto.getVolumeCapabilities(0)
+ .getMountFlagsCount());
+ Assert.assertEquals(2, proto.getVolumeCapabilities(0)
+ .getMountFlagsList().size());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java
new file mode 100644
index 0000000000..97f116af0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * UT for message exchanges.
+ */
+public class TestValidateVolumeCapabilityResponse {
+
+ @Test
+ public void testPBRecord() {
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto =
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.newBuilder()
+ .setSupported(true)
+ .setMessage("capability is supported")
+ .build();
+
+ ValidateVolumeCapabilitiesResponsePBImpl pbImpl =
+ new ValidateVolumeCapabilitiesResponsePBImpl(proto);
+
+ Assert.assertEquals(true, pbImpl.isSupported());
+ Assert.assertEquals("capability is supported", pbImpl.getResponseMessage());
+ Assert.assertEquals(proto, pbImpl.getProto());
+ }
+
+ @Test
+ public void testNewInstance() {
+ ValidateVolumeCapabilitiesResponse pbImpl =
+ ValidateVolumeCapabilitiesResponsePBImpl
+ .newInstance(false, "capability not supported");
+ Assert.assertEquals(false, pbImpl.isSupported());
+ Assert.assertEquals("capability not supported",
+ pbImpl.getResponseMessage());
+
+ CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto =
+ ((ValidateVolumeCapabilitiesResponsePBImpl) pbImpl).getProto();
+ Assert.assertEquals(false, proto.getSupported());
+ Assert.assertEquals("capability not supported", proto.getMessage());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
new file mode 100644
index 0000000000..ecc7fc2c7e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains UT classes for CSI adaptor.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
index 5f2669de49..32563cb367 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
@@ -17,13 +17,12 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
import java.util.concurrent.ScheduledFuture;
@@ -40,12 +39,8 @@ public interface VolumeManager {
/**
* @return all known volumes and their states.
*/
- @VisibleForTesting
VolumeStates getVolumeStates();
- @VisibleForTesting
- void setClient(CsiAdaptorClientProtocol client);
-
/**
* Start to supervise on a volume.
* @param volume
@@ -60,4 +55,20 @@ public interface VolumeManager {
*/
ScheduledFuture schedule(
VolumeProvisioningTask volumeProvisioningTask, int delaySecond);
+
+ /**
+ * Register a csi-driver-adaptor to the volume manager.
+ * @param driverName
+ * @param client
+ */
+ void registerCsiDriverAdaptor(String driverName, CsiAdaptorProtocol client);
+
+ /**
+ * Returns the csi-driver-adaptor client from cache by the given driver name.
+ * If the client is not found, null is returned.
+ * @param driverName
+ * @return a csi-driver-adaptor client working for given driver or null
+ * if the adaptor could not be found.
+ */
+ CsiAdaptorProtocol getAdaptorByDriverName(String driverName);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
index 5252f53514..839d1bc61f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
@@ -18,16 +18,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
-import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -43,20 +55,84 @@ public class VolumeManagerImpl extends AbstractService
private final VolumeStates volumeStates;
private ScheduledExecutorService provisioningExecutor;
- private CsiAdaptorClientProtocol adaptorClient;
+ private Map csiAdaptorMap;
private final static int PROVISIONING_TASK_THREAD_POOL_SIZE = 10;
public VolumeManagerImpl() {
super(VolumeManagerImpl.class.getName());
this.volumeStates = new VolumeStates();
+ this.csiAdaptorMap = new ConcurrentHashMap<>();
this.provisioningExecutor = Executors
.newScheduledThreadPool(PROVISIONING_TASK_THREAD_POOL_SIZE);
- this.adaptorClient = new CsiAdaptorClient();
+ }
+
+ // Init the CSI adaptor cache according to the configuration.
+ // user only needs to configure a list of adaptor addresses,
+ // this method extracts each address and init an adaptor client,
+ // then proceed with a hand-shake by calling adaptor's getPluginInfo
+ // method to retrieve the driver info. If the driver can be resolved,
+ // it is then added to the cache. Note, we don't allow two drivers
+ // specified with same driver-name even version is different.
+ private void initCsiAdaptorCache(
+ final Map adaptorMap, Configuration conf)
+ throws IOException, YarnException {
+ LOG.info("Initializing cache for csi-driver-adaptors");
+ String[] addresses =
+ conf.getStrings(YarnConfiguration.NM_CSI_ADAPTOR_ADDRESSES);
+ if (addresses != null && addresses.length > 0) {
+ for (String addr : addresses) {
+ LOG.info("Found csi-driver-adaptor socket address: " + addr);
+ InetSocketAddress address = NetUtils.createSocketAddr(addr);
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation currentUser =
+ UserGroupInformation.getCurrentUser();
+ CsiAdaptorProtocol adaptorClient = NMProxy
+ .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+ address);
+ // Attempt to resolve the driver by contacting to
+ // the diver's identity service on the given address.
+ // If the call failed, the initialization is also failed
+ // in order running into inconsistent state.
+ LOG.info("Retrieving info from csi-driver-adaptor on address " + addr);
+ GetPluginInfoResponse response =
+ adaptorClient.getPluginInfo(GetPluginInfoRequest.newInstance());
+ if (!Strings.isNullOrEmpty(response.getDriverName())) {
+ String driverName = response.getDriverName();
+ if (adaptorMap.containsKey(driverName)) {
+ throw new YarnException(
+ "Duplicate driver adaptor found," + " driver name: "
+ + driverName);
+ }
+ adaptorMap.put(driverName, adaptorClient);
+ LOG.info("CSI Adaptor added to the cache, adaptor name: " + driverName
+ + ", driver version: " + response.getVersion());
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns a CsiAdaptorProtocol client by the given driver name,
+ * returns null if no adaptor is found for the driver, that means
+ * the driver has not registered to the volume manager yet enhance not valid.
+ * @param driverName the name of the driver
+ * @return CsiAdaptorProtocol client or null if driver not registered
+ */
+ public CsiAdaptorProtocol getAdaptorByDriverName(String driverName) {
+ return csiAdaptorMap.get(driverName);
+ }
+
+ @VisibleForTesting
+ @Override
+ public void registerCsiDriverAdaptor(String driverName,
+ CsiAdaptorProtocol client) {
+ this.csiAdaptorMap.put(driverName, client);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ initCsiAdaptorCache(csiAdaptorMap, conf);
super.serviceInit(conf);
}
@@ -82,18 +158,11 @@ public class VolumeManagerImpl extends AbstractService
// volume already exists
return volumeStates.getVolume(volume.getVolumeId());
} else {
- // add the volume and set the client
- ((VolumeImpl) volume).setClient(adaptorClient);
this.volumeStates.addVolumeIfAbsent(volume);
return volume;
}
}
- @VisibleForTesting
- public void setClient(CsiAdaptorClientProtocol client) {
- this.adaptorClient = client;
- }
-
@Override
public ScheduledFuture schedule(
VolumeProvisioningTask volumeProvisioningTask,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
index 68e89b0b34..83501acd82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
/**
* Major volume interface at RM's view, it maintains the volume states and
@@ -34,4 +36,10 @@ public interface Volume extends EventHandler {
VolumeState getVolumeState();
VolumeId getVolumeId();
+
+ VolumeMetaData getVolumeMeta();
+
+ CsiAdaptorProtocol getClient();
+
+ void setClient(CsiAdaptorProtocol client);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
index 25150473b9..82a4acb04f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
@@ -18,10 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.CsiAdaptorClient;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEventType;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -30,13 +35,16 @@ import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.SINGLE_NODE_READER_ONLY;
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
/**
* This class maintains the volume states and state transition
* according to the CSI volume lifecycle. Volume states are stored in
@@ -54,7 +62,7 @@ public class VolumeImpl implements Volume {
private final VolumeId volumeId;
private final VolumeMetaData volumeMeta;
- private CsiAdaptorClientProtocol client;
+ private CsiAdaptorProtocol adaptorClient;
public VolumeImpl(VolumeMetaData volumeMeta) {
ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -63,16 +71,21 @@ public class VolumeImpl implements Volume {
this.volumeId = volumeMeta.getVolumeId();
this.volumeMeta = volumeMeta;
this.stateMachine = createVolumeStateFactory().make(this);
- this.client = new CsiAdaptorClient();
}
@VisibleForTesting
- public void setClient(CsiAdaptorClientProtocol client) {
- this.client = client;
+ public void setClient(CsiAdaptorProtocol csiAdaptorClient) {
+ this.adaptorClient = csiAdaptorClient;
}
- public CsiAdaptorClientProtocol getClient() {
- return this.client;
+ @Override
+ public CsiAdaptorProtocol getClient() {
+ return this.adaptorClient;
+ }
+
+ @Override
+ public VolumeMetaData getVolumeMeta() {
+ return this.volumeMeta;
}
private StateMachineFactory UNAVAILABLE
// Simulate a failed API call to the adaptor
- doThrow(new VolumeException("failed")).when(mockedClient).validateVolume();
+ doReturn(ValidateVolumeCapabilitiesResponse.newInstance(false, ""))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
volume.handle(new ValidateVolumeEvent(volume));
try {
@@ -80,47 +92,62 @@ public class TestVolumeLifecycle {
}
@Test
- public void testValidated() throws InvalidVolumeException {
- AtomicInteger validatedTimes = new AtomicInteger(0);
+ public void testValidationFailure() throws YarnException, IOException {
+ CsiAdaptorProtocol mockedClient = Mockito
+ .mock(CsiAdaptorProtocol.class);
+ doThrow(new VolumeException("fail"))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
VolumeImpl volume = (VolumeImpl) VolumeBuilder
.newBuilder().volumeId("test_vol_00000001").build();
- CsiAdaptorClientProtocol mockedClient = new CsiAdaptorClientProtocol() {
- @Override
- public void validateVolume() {
- validatedTimes.incrementAndGet();
- }
+ volume.setClient(mockedClient);
- @Override
- public void controllerPublishVolume() {
- // do nothing
- }
- };
+ // NEW -> UNAVAILABLE
+ // Simulate a failed API call to the adaptor
+ doThrow(new VolumeException("failed"))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+ volume.handle(new ValidateVolumeEvent(volume));
+ }
+
+ @Test
+ public void testValidated() throws YarnException, IOException {
+ VolumeImpl volume = (VolumeImpl) VolumeBuilder
+ .newBuilder().volumeId("test_vol_00000001").build();
+ CsiAdaptorProtocol mockedClient = Mockito.mock(CsiAdaptorProtocol.class);
// The client has a count to memorize how many times being called
volume.setClient(mockedClient);
// NEW -> VALIDATED
+ doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
volume.handle(new ValidateVolumeEvent(volume));
Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
- Assert.assertEquals(1, validatedTimes.get());
+ verify(mockedClient, times(1))
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
// VALIDATED -> VALIDATED
volume.handle(new ValidateVolumeEvent(volume));
Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
- Assert.assertEquals(1, validatedTimes.get());
+ verify(mockedClient, times(1))
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
}
@Test
- public void testUnavailableState() throws VolumeException {
+ public void testUnavailableState() throws YarnException, IOException {
VolumeImpl volume = (VolumeImpl) VolumeBuilder
.newBuilder().volumeId("test_vol_00000001").build();
- CsiAdaptorClientProtocol mockedClient = Mockito
- .mock(CsiAdaptorClientProtocol.class);
+ CsiAdaptorProtocol mockedClient = Mockito
+ .mock(CsiAdaptorProtocol.class);
volume.setClient(mockedClient);
// NEW -> UNAVAILABLE
- doThrow(new VolumeException("failed")).when(mockedClient)
- .validateVolume();
+ doThrow(new VolumeException("failed"))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
volume.handle(new ValidateVolumeEvent(volume));
Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
@@ -130,23 +157,26 @@ public class TestVolumeLifecycle {
Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
// UNAVAILABLE -> VALIDATED
- doNothing().when(mockedClient).validateVolume();
+ doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
volume.setClient(mockedClient);
volume.handle(new ValidateVolumeEvent(volume));
Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
}
@Test
- public void testPublishUnavailableVolume() throws VolumeException {
+ public void testPublishUnavailableVolume() throws YarnException, IOException {
VolumeImpl volume = (VolumeImpl) VolumeBuilder
.newBuilder().volumeId("test_vol_00000001").build();
- CsiAdaptorClientProtocol mockedClient = Mockito
- .mock(CsiAdaptorClientProtocol.class);
+ CsiAdaptorProtocol mockedClient = Mockito
+ .mock(CsiAdaptorProtocol.class);
volume.setClient(mockedClient);
// NEW -> UNAVAILABLE (on validateVolume)
- doThrow(new VolumeException("failed")).when(mockedClient)
- .validateVolume();
+ doThrow(new VolumeException("failed"))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
volume.handle(new ValidateVolumeEvent(volume));
Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
@@ -154,7 +184,7 @@ public class TestVolumeLifecycle {
// UNAVAILABLE -> UNAVAILABLE (on publishVolume)
volume.handle(new ControllerPublishVolumeEvent(volume));
// controller publish is not called since the state is UNAVAILABLE
- verify(mockedClient, times(0)).controllerPublishVolume();
+ // verify(mockedClient, times(0)).controllerPublishVolume();
// state remains to UNAVAILABLE
Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
index d6f9d920d1..cee8fdf083 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
@@ -40,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
@@ -57,6 +59,10 @@ import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
/**
* Test cases for volume processor.
*/
@@ -91,6 +97,7 @@ public class TestVolumeProcessor {
conf.set(CapacitySchedulerConfiguration.PREFIX
+ CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy",
"fair");
+ // this is required to enable volume processor
conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
VolumeAMSProcessor.class.getName());
mgr = new NullRMNodeLabelsManager();
@@ -155,6 +162,17 @@ public class TestVolumeProcessor {
.schedulingRequests(Arrays.asList(sc))
.build();
+ // inject adaptor client for testing
+ CsiAdaptorProtocol mockedClient = Mockito
+ .mock(CsiAdaptorProtocol.class);
+ rm.getRMContext().getVolumeManager()
+ .registerCsiDriverAdaptor("hostpath", mockedClient);
+
+ // simulate validation succeed
+ doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
am1.allocate(ar);
VolumeStates volumeStates =
rm.getRMContext().getVolumeManager().getVolumeStates();
@@ -212,12 +230,14 @@ public class TestVolumeProcessor {
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
- CsiAdaptorClientProtocol mockedClient = Mockito
- .mock(CsiAdaptorClientProtocol.class);
+ CsiAdaptorProtocol mockedClient = Mockito
+ .mock(CsiAdaptorProtocol.class);
// inject adaptor client
- rm.getRMContext().getVolumeManager().setClient(mockedClient);
- Mockito.doThrow(new VolumeException("failed")).when(mockedClient)
- .validateVolume();
+ rm.getRMContext().getVolumeManager()
+ .registerCsiDriverAdaptor("hostpath", mockedClient);
+ doThrow(new VolumeException("failed"))
+ .when(mockedClient)
+ .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
Resource resource = Resource.newInstance(1024, 1);
ResourceInformation volumeResource = ResourceInformation