K8S_REGION_TYPE = "k8s"
TILLER_HOST = "localhost"
K8S_CONFIG = None # None means it will use default config (~/.kube/config)
-K8S_ONAP_NAMESPACE = "onap" # ONAP Kubernetes namespace
-K8S_ADDITIONAL_RESOURCES_NAMESPACE = K8S_ONAP_NAMESPACE # Resources created on tests namespace
+K8S_TESTS_NAMESPACE = "onap" # ONAP Kubernetes namespace
+K8S_ADDITIONAL_RESOURCES_NAMESPACE = K8S_TESTS_NAMESPACE # Resources created on tests namespace
MSB_K8S_OVERRIDE_VALUES = None
# SOCK_HTTP = "socks5h://127.0.0.1:8091"
IGNORE_EMPTY_REPLICAS = False
STATUS_DETAILS_JSON = "status-details.json"
INCLUDE_ALL_RES_IN_DETAILS = True
+CHECK_ALL_NAMESPACES = False
EXTRA_NAMESPACE_LIST = []
+EXCLUDE_NAMESPACE_LIST = []
FULL_LOGS_CONTAINERS = [
'dcae-bootstrap', 'dcae-cloudify-manager', 'aai-resources',
@property
def is_primary(self) -> bool:
"""Does step analyses primary namespace."""
- return self.namespace == settings.K8S_ONAP_NAMESPACE
+ return self.namespace == settings.K8S_TESTS_NAMESPACE
def _init_resources(self):
if self.resource_type != "":
def __init__(self):
"""Init CheckNamespaceStatusStep."""
- super().__init__(namespace=settings.K8S_ONAP_NAMESPACE, resource_type="")
+ super().__init__(namespace=settings.K8S_TESTS_NAMESPACE, resource_type="")
self.__logger.debug("K8s namespaces status test init started")
self.job_list_step = None
config.load_incluster_config()
else:
config.load_kube_config(config_file=settings.K8S_CONFIG)
- for namespace in ([self.namespace] + settings.EXTRA_NAMESPACE_LIST):
+ if settings.CHECK_ALL_NAMESPACES or settings.EXCLUDE_NAMESPACE_LIST:
+ self.namespaces_to_check_set = {namespace.metadata.name for namespace in
+ client.CoreV1Api().list_namespace().items} - set(
+ settings.EXCLUDE_NAMESPACE_LIST)
+ else:
+ self.namespaces_to_check_set = set([self.namespace] + settings.EXTRA_NAMESPACE_LIST)
+ for namespace in self.namespaces_to_check_set:
self._init_namespace_steps(namespace)
self.pods = []
self.services = []
ingress_list_step = CheckK8sIngressesStep(namespace)
pvc_list_step = CheckK8sPvcsStep(namespace)
node_list_step = CheckK8sNodesStep(namespace)
- if namespace == settings.K8S_ONAP_NAMESPACE:
+ if namespace == settings.K8S_TESTS_NAMESPACE:
self.job_list_step = job_list_step
self.pod_list_step = pod_list_step
self.service_list_step = service_list_step
"""Check status of all k8s resources in the selected namespaces.
Use settings values:
- - K8S_ONAP_NAMESPACE
+ - K8S_TESTS_NAMESPACE
- STATUS_RESULTS_DIRECTORY
- STORE_ARTIFACTS
- CHECK_POD_VERSIONS
- - EXTRA_NAMESPACE_LIST
- IGNORE_EMPTY_REPLICAS
- INCLUDE_ALL_RES_IN_DETAILS
"""
self.res_dir))
details = {"namespace": {
- "all": settings.EXTRA_NAMESPACE_LIST,
+ "all": list(self.namespaces_to_check_set - set([self.namespace])),
"resources": {}
}}
try:
service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service(
self.service_name,
- settings.K8S_ONAP_NAMESPACE
+ settings.K8S_TESTS_NAMESPACE
)
return service_data.spec.type == "NodePort"
except ApiException as exc:
Use settings values:
- K8S_CONFIG,
- - K8S_ONAP_NAMESPACE.
+ - K8S_TESTS_NAMESPACE.
- EXPOSE_SERVICES_NODE_PORTS
"""
try:
self.k8s_client.patch_namespaced_service(
self.service_name,
- settings.K8S_ONAP_NAMESPACE,
+ settings.K8S_TESTS_NAMESPACE,
{"spec": {"ports": [{"port": self.port,
"nodePort": self.node_port}],
"type": "NodePort"}}
try:
self.k8s_client.patch_namespaced_service(
self.service_name,
- settings.K8S_ONAP_NAMESPACE,
+ settings.K8S_TESTS_NAMESPACE,
[
{
"op": "remove",
api_instance = client.CoreV1Api()
try:
secret = api_instance.read_namespaced_secret(
- settings.SDNC_SECRET_NAME, settings.K8S_ONAP_NAMESPACE)
+ settings.SDNC_SECRET_NAME, settings.K8S_TESTS_NAMESPACE)
if secret.data:
if (self.SDNC_DB_LOGIN in secret.data and self.SDNC_DB_PASSWORD in secret.data):
login_base64 = secret.data[self.SDNC_DB_LOGIN]
api_instance = client.CoreV1Api()
try:
secret = api_instance.read_namespaced_secret(
- settings.SECRET_NAME, settings.K8S_ONAP_NAMESPACE)
+ settings.SECRET_NAME, settings.K8S_TESTS_NAMESPACE)
if secret.data:
if settings.DB_LOGIN in secret.data and settings.DB_PASSWORD in secret.data:
login_base64 = secret.data[settings.DB_LOGIN]
- DB_PRIMARY_HOST,
- DATABASE,
- DB_PORT,
- - K8S_ONAP_NAMESPACE,
+ - K8S_TESTS_NAMESPACE,
- SECRET_NAME,
- DB_LOGIN,
- DB_PASSWORD.
k8s_client: "client.CoreV1Api" = client.CoreV1Api()
try:
for service in k8s_client.list_namespaced_service(
- namespace=settings.K8S_ONAP_NAMESPACE).items:
+ namespace=settings.K8S_TESTS_NAMESPACE).items:
if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME:
proto = "http"
if "443" in str(service.spec.ports[0].port):
secret_name: str,
login_key: str,
password_key: str,
- namespace: str = settings.K8S_ONAP_NAMESPACE):
+ namespace: str = settings.K8S_TESTS_NAMESPACE):
"""Resolve SDNC datbase credentials from k8s secret.
Args: