HDDS-1164. Add New blockade Tests to test Replica Manager. Contributed by Nilotpal Nandi.

This commit is contained in:
Shashikant Banerjee 2019-04-03 22:19:10 +05:30
parent 002dcc4ebf
commit 7cd7045eea
8 changed files with 403 additions and 225 deletions

View File

@ -45,9 +45,8 @@ def blockade_status(cls):
@classmethod @classmethod
def make_flaky(cls, flaky_node, container_list): def make_flaky(cls, flaky_node, container_list):
# make the network flaky # make the network flaky
om = filter(lambda x: 'ozoneManager' in x, container_list) om, scm, _, datanodes = \
scm = filter(lambda x: 'scm' in x, container_list) ClusterUtils.find_om_scm_client_datanodes(container_list)
datanodes = filter(lambda x: 'datanode' in x, container_list)
node_dict = { node_dict = {
"all": "--all", "all": "--all",
"scm" : scm[0], "scm" : scm[0],

View File

@ -71,7 +71,7 @@ def cluster_destroy(cls, docker_compose_file):
@classmethod @classmethod
def run_freon(cls, docker_compose_file, num_volumes, num_buckets, def run_freon(cls, docker_compose_file, num_volumes, num_buckets,
num_keys, key_size, replication_type, replication_factor, num_keys, key_size, replication_type, replication_factor,
freon_client='ozoneManager'): freon_client='om'):
# run freon # run freon
cmd = "docker-compose -f %s " \ cmd = "docker-compose -f %s " \
"exec %s /opt/hadoop/bin/ozone " \ "exec %s /opt/hadoop/bin/ozone " \
@ -115,7 +115,7 @@ def run_cmd(cls, cmd):
@classmethod @classmethod
def get_ozone_confkey_value(cls, docker_compose_file, key_name): def get_ozone_confkey_value(cls, docker_compose_file, key_name):
cmd = "docker-compose -f %s " \ cmd = "docker-compose -f %s " \
"exec ozoneManager /opt/hadoop/bin/ozone " \ "exec om /opt/hadoop/bin/ozone " \
"getconf -confKey %s" \ "getconf -confKey %s" \
% (docker_compose_file, key_name) % (docker_compose_file, key_name)
exit_code, output = cls.run_cmd(cmd) exit_code, output = cls.run_cmd(cmd)
@ -307,3 +307,22 @@ def find_checksum(cls, docker_compose_file, filepath):
checksum = finaloutput.split(" ") checksum = finaloutput.split(" ")
logger.info("Checksum of %s is : %s", filepath, checksum[0]) logger.info("Checksum of %s is : %s", filepath, checksum[0])
return checksum[0] return checksum[0]
@classmethod
def get_pipelines(cls, docker_compose_file):
command = "docker-compose -f %s " \
+ "exec ozone_client /opt/hadoop/bin/ozone scmcli " \
+ "listPipelines" % (docker_compose_file)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "list pipeline command failed"
return output
@classmethod
def find_om_scm_client_datanodes(cls, container_list):
om = filter(lambda x: 'om_1' in x, container_list)
scm = filter(lambda x: 'scm' in x, container_list)
datanodes = sorted(
list(filter(lambda x: 'datanode' in x, container_list)))
client = filter(lambda x: 'ozone_client' in x, container_list)
return om, scm, client, datanodes

View File

@ -47,11 +47,8 @@ def setup():
exit_code, output = Blockade.blockade_status() exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \ assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output output
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST) OM, SCM, CLIENT, DATANODES = \
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST) ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
CLIENT = filter(lambda x: 'ozone_client' in x, CONTAINER_LIST)
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE", "ozone_client") "THREE", "ozone_client")
assert exit_code == 0, "freon run failed with output=[%s]" % output assert exit_code == 0, "freon run failed with output=[%s]" % output

View File

@ -42,9 +42,8 @@ def setup():
exit_code, output = Blockade.blockade_status() exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \ assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output output
OM = [x for x in CONTAINER_LIST if 'ozoneManager' in x] OM, SCM, _, DATANODES = \
SCM = [x for x in CONTAINER_LIST if 'scm' in x] ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
DATANODES = sorted(x for x in CONTAINER_LIST if 'datanode' in x)
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE") "THREE")

View File

@ -18,16 +18,17 @@
import os import os
import time import time
import logging import logging
import re
from blockadeUtils.blockade import Blockade from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils from clusterUtils.cluster_utils import ClusterUtils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
"docker-compose.yaml") "docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 3 SCALE = 3
INCREASED_SCALE = 5
CONTAINER_LIST = [] CONTAINER_LIST = []
OM = [] OM = []
SCM = [] SCM = []
@ -35,83 +36,114 @@
def setup(): def setup():
global CONTAINER_LIST, OM, SCM, DATANODES global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy() Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status() exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \ assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output output
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST) OM, SCM, _, DATANODES = \
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST) ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE") "THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output assert exit_code == 0, "freon run failed with output=[%s]" % output
def teardown(): def teardown():
logger.info("Inside teardown") logger.info("Inside teardown")
Blockade.blockade_destroy() Blockade.blockade_destroy()
def teardown_module(): def teardown_module():
ClusterUtils.cluster_destroy(FILE) ClusterUtils.cluster_destroy(FILE)
def test_one_dn_isolate_scm_other_dn(): def test_one_dn_isolate_scm_other_dn(run_second_phase):
""" """
In this test, one of the datanodes cannot communicate with SCM and other In this test, one of the datanodes cannot communicate with SCM and other
datanodes. datanodes.
Other datanodes can communicate with each other and SCM . Other datanodes can communicate with each other and SCM .
Expectation : The container should eventually have two closed replicas. Expectation : The container should eventually have two closed replicas.
""" """
first_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] first_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], DATANODES[0]] second_set = [OM[0], DATANODES[0]]
Blockade.blockade_create_partition(first_set, second_set) Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 2, \
"The container should have two closed replicas."
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status() Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status", logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"]) os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \ all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) ClusterUtils.findall_container_status(
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', FILE, INCREASED_SCALE)
all_datanodes_container_status) count_closed_container_datanodes = filter(
assert len(count_closed_container_datanodes) == 2, \ lambda x: x == 'CLOSED', all_datanodes_container_status)
"The container should have two closed replicas." assert len(count_closed_container_datanodes) >= 3, \
"The container should have at least three closed replicas."
_, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert re.search("Status: Success", output) is not None
def test_one_dn_isolate_other_dn(): def test_one_dn_isolate_other_dn(run_second_phase):
""" """
In this test, one of the datanodes (first datanode) cannot communicate In this test, one of the datanodes (first datanode) cannot communicate
other datanodes but can communicate with SCM. other datanodes but can communicate with SCM.
One of the other two datanodes (second datanode) cannot communicate with One of the other two datanodes (second datanode) cannot communicate with
SCM. SCM.
Expectation : Expectation :
The container replica state in first datanode can be either closed or The container replica state in first datanode can be either closed or
quasi-closed. quasi-closed.
The container replica state in second datanode can be either closed or open. The container replica state in second datanode can be either closed or open.
The container should eventually have at lease one closed replica. The container should eventually have at lease one closed replica.
""" """
first_set = [OM[0], SCM[0], DATANODES[0]] first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1], DATANODES[2]] second_set = [OM[0], DATANODES[1], DATANODES[2]]
third_set = [SCM[0], DATANODES[2]] third_set = [SCM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set) Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
assert first_datanode_status == 'CLOSED' or \
first_datanode_status == "QUASI_CLOSED"
assert second_datanode_status == 'CLOSED' or \
second_datanode_status == "OPEN"
assert len(count_closed_container_datanodes) >= 1, \
"The container should have at least one closed replica"
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status() Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status", logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"]) os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \ all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) ClusterUtils.findall_container_status(
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', FILE, INCREASED_SCALE)
all_datanodes_container_status) count_closed_container_datanodes = filter(
first_datanode_status = all_datanodes_container_status[0] lambda x: x == 'CLOSED', all_datanodes_container_status)
second_datanode_status = all_datanodes_container_status[1] assert len(count_closed_container_datanodes) >= 3, \
assert first_datanode_status == 'CLOSED' or \ "The container should have at least three closed replicas."
first_datanode_status == "QUASI_CLOSED" _, output = \
assert second_datanode_status == 'CLOSED' or \ ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
second_datanode_status == "OPEN" assert re.search("Status: Success", output) is not None
assert len(count_closed_container_datanodes) >= 1, \
"The container should have at least one closed replica"

View File

@ -18,16 +18,17 @@
import os import os
import time import time
import logging import logging
import re
from blockadeUtils.blockade import Blockade from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils from clusterUtils.cluster_utils import ClusterUtils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
"docker-compose.yaml") "docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 3 SCALE = 3
INCREASED_SCALE = 5
CONTAINER_LIST = [] CONTAINER_LIST = []
OM = [] OM = []
SCM = [] SCM = []
@ -35,110 +36,190 @@
def setup(): def setup():
global CONTAINER_LIST, OM, SCM, DATANODES global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy() Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status() exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \ assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output output
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST) OM, SCM, _, DATANODES = \
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST) ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
exit_code, output = \ exit_code, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output assert exit_code == 0, "freon run failed with output=[%s]" % output
def teardown(): def teardown():
logger.info("Inside teardown") logger.info("Inside teardown")
Blockade.blockade_destroy() Blockade.blockade_destroy()
def teardown_module(): def teardown_module():
ClusterUtils.cluster_destroy(FILE) ClusterUtils.cluster_destroy(FILE)
def test_three_dns_isolate_onescmfailure(): def test_three_dns_isolate_onescmfailure(run_second_phase):
""" """
In this test, all datanodes are isolated from each other. In this test, all datanodes are isolated from each other.
One of the datanodes (third datanode) cannot communicate with SCM. One of the datanodes (third datanode) cannot communicate with SCM.
Expectation : Expectation :
The container replica state in first datanode should be closed. The container replica state in first datanode should be closed.
The container replica state in second datanode should be closed. The container replica state in second datanode should be closed.
The container replica state in third datanode should be open. The container replica state in third datanode should be open.
""" """
first_set = [OM[0], SCM[0], DATANODES[0]] first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], SCM[0], DATANODES[1]] second_set = [OM[0], SCM[0], DATANODES[1]]
third_set = [OM[0], DATANODES[2]] third_set = [OM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set) Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'CLOSED'
assert second_datanode_status == 'CLOSED'
assert third_datanode_status == 'OPEN'
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status() Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status", logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"]) os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \ all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) ClusterUtils.findall_container_status(
first_datanode_status = all_datanodes_container_status[0] FILE, INCREASED_SCALE)
second_datanode_status = all_datanodes_container_status[1] count_closed_container_datanodes = filter(
third_datanode_status = all_datanodes_container_status[2] lambda x: x == 'CLOSED', all_datanodes_container_status)
assert first_datanode_status == 'CLOSED' assert len(count_closed_container_datanodes) == 3, \
assert second_datanode_status == 'CLOSED' "The container should have three closed replicas."
assert third_datanode_status == 'OPEN' Blockade.blockade_join()
def test_three_dns_isolate_twoscmfailure():
"""
In this test, all datanodes are isolated from each other.
two datanodes cannot communicate with SCM (second datanode and third
datanode)
Expectation :
The container replica state in first datanode should be quasi-closed.
The container replica state in second datanode should be open.
The container replica state in third datanode should be open.
"""
first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1]]
third_set = [OM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status() Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status", logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"]) os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \ all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) ClusterUtils.findall_container_status(
first_datanode_status = all_datanodes_container_status[0] FILE, INCREASED_SCALE)
second_datanode_status = all_datanodes_container_status[1] count_closed_container_datanodes = filter(
third_datanode_status = all_datanodes_container_status[2] lambda x: x == 'CLOSED', all_datanodes_container_status)
assert first_datanode_status == 'QUASI_CLOSED' assert len(count_closed_container_datanodes) == 3, \
assert second_datanode_status == 'OPEN' "The container should have three closed replicas."
assert third_datanode_status == 'OPEN'
def test_three_dns_isolate_threescmfailure(): def test_three_dns_isolate_twoscmfailure(run_second_phase):
""" """
In this test, all datanodes are isolated from each other and also cannot In this test, all datanodes are isolated from each other.
communicate with SCM. two datanodes cannot communicate with SCM (second datanode and third
Expectation : datanode)
The container replica state in first datanode should be open. Expectation :
The container replica state in second datanode should be open. The container replica state in first datanode should be quasi-closed.
The container replica state in third datanode should be open. The container replica state in second datanode should be open.
""" The container replica state in third datanode should be open.
first_set = [OM[0], DATANODES[0]] """
second_set = [OM[0], DATANODES[1]] first_set = [OM[0], SCM[0], DATANODES[0]]
third_set = [OM[0], DATANODES[2]] second_set = [OM[0], DATANODES[1]]
Blockade.blockade_create_partition(first_set, second_set, third_set) third_set = [OM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'QUASI_CLOSED'
assert second_datanode_status == 'OPEN'
assert third_datanode_status == 'OPEN'
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status() Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status", logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"]) os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \ all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) ClusterUtils.findall_container_status(
first_datanode_status = all_datanodes_container_status[0] FILE, INCREASED_SCALE)
second_datanode_status = all_datanodes_container_status[1] count_quasi_closed_container_datanodes = filter(
third_datanode_status = all_datanodes_container_status[2] lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status)
assert first_datanode_status == 'OPEN' assert len(count_quasi_closed_container_datanodes) >= 3, \
assert second_datanode_status == 'OPEN' "The container should have at least three quasi-closed replicas."
assert third_datanode_status == 'OPEN' Blockade.blockade_join()
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 3, \
"The container should have three closed replicas."
def test_three_dns_isolate_threescmfailure(run_second_phase):
"""
In this test, all datanodes are isolated from each other and also cannot
communicate with SCM.
Expectation :
The container replica state in first datanode should be open.
The container replica state in second datanode should be open.
The container replica state in third datanode should be open.
"""
first_set = [OM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1]]
third_set = [OM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'OPEN'
assert second_datanode_status == 'OPEN'
assert third_datanode_status == 'OPEN'
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
output = ClusterUtils.get_pipelines(FILE)
if output:
assert re.search("Factor:THREE", output) is None
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
datanodes_having_container_status = filter(
lambda x: x != 'None', all_datanodes_container_status)
assert len(datanodes_having_container_status) == 3, \
"Containers should not be replicated on addition of new nodes."
Blockade.blockade_join()
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 3, \
"The container should have three closed replicas."

View File

@ -18,16 +18,17 @@
import os import os
import time import time
import logging import logging
import re
from blockadeUtils.blockade import Blockade from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils from clusterUtils.cluster_utils import ClusterUtils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
"docker-compose.yaml") "docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 3 SCALE = 3
INCREASED_SCALE = 5
CONTAINER_LIST = [] CONTAINER_LIST = []
OM = [] OM = []
SCM = [] SCM = []
@ -35,87 +36,138 @@
def setup(): def setup():
global CONTAINER_LIST, OM, SCM, DATANODES global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy() Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status() exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \ assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output output
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST) OM, SCM, _, DATANODES = \
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST) ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
exit_code, output = \ exit_code, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output assert exit_code == 0, "freon run failed with output=[%s]" % output
def teardown(): def teardown():
logger.info("Inside teardown") logger.info("Inside teardown")
Blockade.blockade_destroy() Blockade.blockade_destroy()
def teardown_module(): def teardown_module():
ClusterUtils.cluster_destroy(FILE) ClusterUtils.cluster_destroy(FILE)
def test_two_dns_isolate_scm_same_partition(): def test_two_dns_isolate_scm_same_partition(run_second_phase):
""" """
In this test, one of the datanodes (first datanode) cannot communicate In this test, there are three datanodes, DN1, DN2, DN3
with other two datanodes. DN1 is on a network partition and
Two datanodes (second datanode and third datanode), on same network DN2, DN3 are on a different network partition.
parition, cannot communicate with SCM. DN2 and DN3 cannot communicate with SCM.
Expectation : Expectation :
The container replica state in first datanode should be quasi-closed. The container replica state in DN1 should be quasi-closed.
The container replica state in second datanode should be open. The container replica state in DN2 should be open.
The container replica state in third datanode should be open. The container replica state in DN3 should be open.
""" """
first_set = [OM[0], DATANODES[1], DATANODES[2]] first_set = [OM[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], SCM[0], DATANODES[0]] second_set = [OM[0], SCM[0], DATANODES[0]]
Blockade.blockade_create_partition(first_set, second_set) Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'QUASI_CLOSED'
assert second_datanode_status == 'OPEN'
assert third_datanode_status == 'OPEN'
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status() Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status", logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"]) os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \ all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) ClusterUtils.findall_container_status(
first_datanode_status = all_datanodes_container_status[0] FILE, INCREASED_SCALE)
second_datanode_status = all_datanodes_container_status[1] count_quasi_closed_container_datanodes = filter(
third_datanode_status = all_datanodes_container_status[2] lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status)
assert first_datanode_status == 'QUASI_CLOSED' assert len(count_quasi_closed_container_datanodes) >= 3, \
assert second_datanode_status == 'OPEN' "The container should have at least three quasi-closed replicas."
assert third_datanode_status == 'OPEN' Blockade.blockade_join()
Blockade.blockade_status()
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
def test_two_dns_isolate_scm_different_partition(): all_datanodes_container_status = \
""" ClusterUtils.findall_container_status(
In this test, one of the datanodes (first datanode) cannot communicate with FILE, INCREASED_SCALE)
other two datanodes. count_closed_container_datanodes = filter(
Two datanodes (first datanode and second datanode), lambda x: x == 'CLOSED', all_datanodes_container_status)
on different network paritions, cannot communicate with SCM. assert len(count_closed_container_datanodes) >= 3
Expectation :
The container replica state in first datanode should be open.
The container replica states can be either 'closed' def test_two_dns_isolate_scm_different_partition(run_second_phase):
in both second and third datanode, or, """
'open' in second datanode and 'quasi-closed' in third datanode. In this test, there are three datanodes, DN1, DN2, DN3
""" DN1 is on a network partition and
first_set = [OM[0], DATANODES[0]] DN2, DN3 are on a different network partition.
second_set = [OM[0], DATANODES[1], DATANODES[2]] DN1 and DN2 cannot communicate with SCM.
third_set = [SCM[0], DATANODES[2]] Expectation :
Blockade.blockade_create_partition(first_set, second_set, third_set) The container replica state in datanode DN1 should be open.
The container replica states can be either 'closed'
in DN2 and DN3, or,
'open' in DN2 and 'quasi-closed' in DN3.
"""
first_set = [OM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1], DATANODES[2]]
third_set = [SCM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'OPEN'
assert (second_datanode_status == 'CLOSED' and
third_datanode_status == 'CLOSED') or \
(second_datanode_status == 'OPEN' and
third_datanode_status == 'QUASI_CLOSED')
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status() Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status", logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"]) os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \ all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) ClusterUtils.findall_container_status(
first_datanode_status = all_datanodes_container_status[0] FILE, INCREASED_SCALE)
second_datanode_status = all_datanodes_container_status[1] count_closed_container_datanodes = filter(
third_datanode_status = all_datanodes_container_status[2] lambda x: x == 'CLOSED', all_datanodes_container_status)
assert first_datanode_status == 'OPEN' count_qausi_closed_container_datanodes = filter(
assert (second_datanode_status == 'CLOSED' and lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status)
third_datanode_status == 'CLOSED') or \ assert len(count_closed_container_datanodes) >= 3 or \
(second_datanode_status == 'OPEN' and len(count_qausi_closed_container_datanodes) >= 3
third_datanode_status == 'QUASI_CLOSED') Blockade.blockade_join()
Blockade.blockade_status()
if len(count_closed_container_datanodes) < 3:
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 3
_, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert re.search("Status: Success", output) is not None

View File

@ -42,9 +42,8 @@ def setup():
exit_code, output = Blockade.blockade_status() exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \ assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output output
OM = [x for x in CONTAINER_LIST if 'ozoneManager' in x] OM, SCM, _, DATANODES = \
SCM = [x for x in CONTAINER_LIST if 'scm' in x] ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
DATANODES = sorted(x for x in CONTAINER_LIST if 'datanode' in x)
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE") "THREE")