HDDS-1384. TestBlockOutputStreamWithFailures is failing

Closes #1029
This commit is contained in:
Márton Elek 2019-07-11 12:46:39 +02:00
parent 14c43f85de
commit 9119ed07ff
No known key found for this signature in database
GPG Key ID: D51EA8F00EE79B28
3 changed files with 83 additions and 48 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto; .ContainerCommandRequestProto;
@ -51,9 +52,6 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -71,6 +69,8 @@ public final class XceiverServerGrpc extends XceiverServer {
private Server server; private Server server;
private final ContainerDispatcher storageContainer; private final ContainerDispatcher storageContainer;
private boolean isStarted; private boolean isStarted;
private DatanodeDetails datanodeDetails;
/** /**
* Constructs a Grpc server class. * Constructs a Grpc server class.
@ -84,25 +84,15 @@ public final class XceiverServerGrpc extends XceiverServer {
Preconditions.checkNotNull(conf); Preconditions.checkNotNull(conf);
this.id = datanodeDetails.getUuid(); this.id = datanodeDetails.getUuid();
this.datanodeDetails = datanodeDetails;
this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
// Get an available port on current node and
// use that as the container port
if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) { OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
try (ServerSocket socket = new ServerSocket()) { this.port = 0;
socket.setReuseAddress(true);
SocketAddress address = new InetSocketAddress(0);
socket.bind(address);
this.port = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", this.port);
} catch (IOException e) {
LOG.error("Unable find a random free port for the server, "
+ "fallback to use default port {}", this.port, e);
}
} }
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
NettyServerBuilder nettyServerBuilder = NettyServerBuilder nettyServerBuilder =
((NettyServerBuilder) ServerBuilder.forPort(port)) ((NettyServerBuilder) ServerBuilder.forPort(port))
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE); .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
@ -165,6 +155,19 @@ public final class XceiverServerGrpc extends XceiverServer {
public void start() throws IOException { public void start() throws IOException {
if (!isStarted) { if (!isStarted) {
server.start(); server.start();
int realPort = server.getPort();
if (port == 0) {
LOG.info("{} {} is started using port {}", getClass().getSimpleName(),
this.id, realPort);
port = realPort;
}
//register the real port to the datanode details.
datanodeDetails.setPort(DatanodeDetails
.newPort(Name.STANDALONE,
realPort));
isStarted = true; isStarted = true;
} }
} }

View File

@ -66,6 +66,7 @@ import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -73,9 +74,6 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -101,7 +99,7 @@ public final class XceiverServerRatis extends XceiverServer {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
} }
private final int port; private int port;
private final RaftServer server; private final RaftServer server;
private ThreadPoolExecutor chunkExecutor; private ThreadPoolExecutor chunkExecutor;
private final List<ExecutorService> executors; private final List<ExecutorService> executors;
@ -112,6 +110,7 @@ public final class XceiverServerRatis extends XceiverServer {
private long nodeFailureTimeoutMs; private long nodeFailureTimeoutMs;
private final long cacheEntryExpiryInteval; private final long cacheEntryExpiryInteval;
private boolean isStarted = false; private boolean isStarted = false;
private DatanodeDetails datanodeDetails;
private XceiverServerRatis(DatanodeDetails dd, int port, private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext ContainerDispatcher dispatcher, Configuration conf, StateContext
@ -119,6 +118,7 @@ public final class XceiverServerRatis extends XceiverServer {
throws IOException { throws IOException {
super(conf, caClient); super(conf, caClient);
Objects.requireNonNull(dd, "id == null"); Objects.requireNonNull(dd, "id == null");
datanodeDetails = dd;
this.port = port; this.port = port;
RaftProperties serverProperties = newRaftProperties(conf); RaftProperties serverProperties = newRaftProperties(conf);
final int numWriteChunkThreads = conf.getInt( final int numWriteChunkThreads = conf.getInt(
@ -403,21 +403,11 @@ public final class XceiverServerRatis extends XceiverServer {
if (ozoneConf.getBoolean(OzoneConfigKeys if (ozoneConf.getBoolean(OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) { OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
try (ServerSocket socket = new ServerSocket()) { localPort = 0;
socket.setReuseAddress(true);
SocketAddress address = new InetSocketAddress(0);
socket.bind(address);
localPort = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", localPort);
} catch (IOException e) {
LOG.error("Unable find a random free port for the server, "
+ "fallback to use default port {}", localPort, e);
}
} }
GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig( GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig(
new SecurityConfig(ozoneConf)); new SecurityConfig(ozoneConf));
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
return new XceiverServerRatis(datanodeDetails, localPort, return new XceiverServerRatis(datanodeDetails, localPort,
dispatcher, ozoneConf, context, tlsConfig, caClient); dispatcher, ozoneConf, context, tlsConfig, caClient);
} }
@ -429,6 +419,22 @@ public final class XceiverServerRatis extends XceiverServer {
server.getId(), getIPCPort()); server.getId(), getIPCPort());
chunkExecutor.prestartAllCoreThreads(); chunkExecutor.prestartAllCoreThreads();
server.start(); server.start();
int realPort =
((RaftServerProxy) server).getServerRpc().getInetSocketAddress()
.getPort();
if (port == 0) {
LOG.info("{} {} is started using port {}", getClass().getSimpleName(),
server.getId(), realPort);
port = realPort;
}
//register the real port to the datanode details.
datanodeDetails.setPort(DatanodeDetails
.newPort(DatanodeDetails.Port.Name.RATIS,
realPort));
isStarted = true; isStarted = true;
} }
} }

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -194,25 +195,50 @@ public class TestMiniOzoneCluster {
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true); ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true); true);
try ( List<DatanodeStateMachine> stateMachines = new ArrayList<>();
DatanodeStateMachine sm1 = new DatanodeStateMachine( try {
TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine( for (int i = 0; i < 3; i++) {
TestUtils.randomDatanodeDetails(), ozoneConf, null, null); stateMachines.add(new DatanodeStateMachine(
DatanodeStateMachine sm3 = new DatanodeStateMachine( TestUtils.randomDatanodeDetails(), ozoneConf, null, null));
TestUtils.randomDatanodeDetails(), ozoneConf, null, null) }
) {
//we need to start all the servers to get the fix ports
for (DatanodeStateMachine dsm : stateMachines) {
dsm.getContainer().getReadChannel().start();
dsm.getContainer().getWriteChannel().start();
}
for (DatanodeStateMachine dsm : stateMachines) {
dsm.getContainer().getWriteChannel().stop();
dsm.getContainer().getReadChannel().stop();
}
//after the start the real port numbers should be available AND unique
HashSet<Integer> ports = new HashSet<Integer>(); HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort())); for (DatanodeStateMachine dsm : stateMachines) {
assertTrue(ports.add(sm2.getContainer().getReadChannel().getIPCPort())); int readPort = dsm.getContainer().getReadChannel().getIPCPort();
assertTrue(ports.add(sm3.getContainer().getReadChannel().getIPCPort()));
// Assert that ratis is also on a different port. assertNotEquals("Port number of the service is not updated", 0,
assertTrue(ports.add(sm1.getContainer().getWriteChannel().getIPCPort())); readPort);
assertTrue(ports.add(sm2.getContainer().getWriteChannel().getIPCPort()));
assertTrue(ports.add(sm3.getContainer().getWriteChannel().getIPCPort()));
assertTrue("Port of datanode service is conflicted with other server.",
ports.add(readPort));
int writePort = dsm.getContainer().getWriteChannel().getIPCPort();
assertNotEquals("Port number of the service is not updated", 0,
writePort);
assertTrue("Port of datanode service is conflicted with other server.",
ports.add(writePort));
}
} finally {
for (DatanodeStateMachine dsm : stateMachines) {
dsm.close();
}
} }
// Turn off the random port flag and test again // Turn off the random port flag and test again