from yaml import load, SafeLoader
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Specific Basic clamp settings."""
CLEANUP_FLAG = False
# pylint: disable=bad-whitespace
# The ONAP part
-SERVICE_DETAILS=("Onboarding, enriching a model with TCA." +
- "Design a loop with Clamp and deploy it in Policy and DCAE")
-SERVICE_COMPONENTS="SDC, CLAMP, POLICY, DCAE, DMAAP"
+SERVICE_DETAILS = ("Onboarding, enriching a model with TCA." +
+ "Design a loop with Clamp and deploy it in Policy and DCAE")
+SERVICE_COMPONENTS = "SDC, CLAMP, POLICY, DCAE, DMAAP"
VENDOR_NAME = "basiclamp_vendor"
VSP_NAME = "basiclamp_vsp"
OPERATIONAL_POLICIES = [
- {
- "name": "MinMax",
- "policy_type": "onap.policies.controlloop.guard.common.MinMax",
- "policy_version": "1.0.0",
- "config_function": "add_minmax_config", #func
- "configuration": {
- "min": 1,
- "max": 10
+ {
+ "name": "MinMax",
+ "policy_type": "onap.policies.controlloop.guard.common.MinMax",
+ "policy_version": "1.0.0",
+ "config_function": "add_minmax_config", # func
+ "configuration": {
+ "min": 1,
+ "max": 10
+ }
+ },
+ {
+ "name": "FrequencyLimiter",
+ "policy_type": "onap.policies.controlloop.guard.common.FrequencyLimiter",
+ "policy_version": "1.0.0",
+ "config_function": "add_frequency_limiter", # func
+ "configuration": {}
}
- },
- {
- "name": "FrequencyLimiter",
- "policy_type": "onap.policies.controlloop.guard.common.FrequencyLimiter",
- "policy_version": "1.0.0",
- "config_function": "add_frequency_limiter", #func
- "configuration": {}
- }
]
# if a yaml file is define, retrieve info from this yaml files
SERVICE_NAME = next(iter(yaml_config_file.keys()))
VF_NAME = SERVICE_NAME
except ValueError:
- SERVICE_NAME = "" # Fill me
- VF_NAME = "" # Fill me
+ SERVICE_NAME = "" # Fill me
+ VF_NAME = "" # Fill me
MODEL_YAML_TEMPLATE = None
from onaptests.utils.resources import get_resource_location
import onaptests.utils.exceptions as onap_test_exceptions
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Specific basic_cnf_macro with multicloud-k8s and yaml config scenario."""
SERVICE_DETAILS = ("Onboarding, distribution and instantiation of a Apache CNF " +
TENANT_NAME = 'dummy_test'
-SERVICE_YAML_TEMPLATE = Path(get_resource_location("templates/vnf-services/basic_cnf_macro-service.yaml"))
+SERVICE_YAML_TEMPLATE = Path(get_resource_location(
+ "templates/vnf-services/basic_cnf_macro-service.yaml"))
try:
# Try to retrieve the SERVICE NAME from the yaml file
from yaml import load, SafeLoader
from onaptests.utils.resources import get_resource_location
import onaptests.utils.exceptions as onap_test_exceptions
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Specific basic_cnf with multicloud-k8s and yaml config scenario."""
SERVICE_DETAILS = ("Onboarding, distribution and instantiation of a CNF" +
-from .settings import *
+from .settings import * # noqa
import json
from pathlib import Path
CLEANUP_FLAG = True
-ANCHOR_DATA = json.dumps({
- "bookstore": {
- "bookstore-name": "Chapters",
- "categories": [
- {
- "code": 1,
- "name": "SciFi",
- "books": [
- {
- "title": "2001: A Space Odyssey",
- "price": 5
- },
- {
- "title": "Dune",
- "price": 5
- }
- ]
- },
- {
- "code": 2,
- "name": "Kids",
- "books": [
- {
- "title": "Matilda"
- }
- ]
+ANCHOR_DATA = json.dumps(
+ {
+ "bookstore": {
+ "bookstore-name": "Chapters",
+ "categories": [{
+ "code": 1,
+ "name": "SciFi",
+ "books": [{
+ "title": "2001: A Space Odyssey",
+ "price": 5
+ }, {
+ "title": "Dune",
+ "price": 5
+ }]
+ }, {
+ "code": 2,
+ "name": "Kids",
+ "books": [{
+ "title": "Matilda"
+ }]
+ }]
}
- ]
}
- })
+)
ANCHOR_NAME = "basic-cps-test-anchor"
DATASPACE_NAME = "basic-cps-test-dataspace"
SCHEMA_SET_NAME = "basic-cps-test-schema-set"
from yaml import load, SafeLoader
from onaptests.utils.resources import get_resource_location
import onaptests.utils.exceptions as onap_test_exceptions
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Specific Basic Network without multicloud."""
# pylint: disable=bad-whitespace
# The ONAP part
-SERVICE_DETAILS="Onboarding, distribution and instantiation of Basic Network using à la carte"
-SERVICE_COMPONENTS="SDC, DMAAP, AAI, SO, SDNC"
+SERVICE_DETAILS = "Onboarding, distribution and instantiation of Basic Network using à la carte"
+SERVICE_COMPONENTS = "SDC, DMAAP, AAI, SO, SDNC"
USE_MULTICLOUD = False
# Set ONLY_INSTANTIATE to true to run an instantiation without repeating
# onboarding and related AAI configuration (Cloud config)
-ONLY_INSTANTIATE= False
+ONLY_INSTANTIATE = False
# if a yaml file is define, retrieve info from this yaml files
# if not declare the parameters in the settings
TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
cloud = openstack.connect(cloud=TEST_CLOUD)
-VIM_USERNAME = cloud.config.auth.get('username','Fill me')
-VIM_PASSWORD = cloud.config.auth.get('password','Fill me')
-VIM_SERVICE_URL = cloud.config.auth.get('auth_url','Fill me')
-TENANT_ID = cloud.config.auth.get('project_id','Fill me')
-TENANT_NAME = cloud.config.auth.get('project_name','Fill me')
-CLOUD_REGION_ID = cloud.config.auth.get('region_name','RegionOne')
-CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name','Default')
+VIM_USERNAME = cloud.config.auth.get('username', 'Fill me')
+VIM_PASSWORD = cloud.config.auth.get('password', 'Fill me')
+VIM_SERVICE_URL = cloud.config.auth.get('auth_url', 'Fill me')
+TENANT_ID = cloud.config.auth.get('project_id', 'Fill me')
+TENANT_NAME = cloud.config.auth.get('project_name', 'Fill me')
+CLOUD_REGION_ID = cloud.config.auth.get('region_name', 'RegionOne')
+CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name', 'Default')
MODEL_YAML_TEMPLATE = None
import onaptests.utils.exceptions as onap_test_exceptions
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Creation of service to onboard"""
with open(SERVICE_YAML_TEMPLATE, 'w+') as file_to_write:
file_to_write.write(rendered_template)
+
"""Basic onboard service to only onboard a service in SDC"""
# pylint: disable=bad-whitespace
# The ONAP part
-SERVICE_DETAILS="Onboarding of an Ubuntu VM"
-SERVICE_COMPONENTS="SDC"
+SERVICE_DETAILS = "Onboarding of an Ubuntu VM"
+SERVICE_COMPONENTS = "SDC"
-#USE_MULTICLOUD = False
+# USE_MULTICLOUD = False
# Set ONLY_INSTANTIATE to true to run an instantiation without repeating
# onboarding and related AAI configuration (Cloud config)
-#ONLY_INSTANTIATE= False
+# ONLY_INSTANTIATE= False
# if a yaml file is define, retrieve info from this yaml files
# if not declare the parameters in the settings
except (FileNotFoundError, ValueError):
raise onap_test_exceptions.TestConfigurationException
-#CLEANUP_FLAG = True
-#CLEANUP_ACTIVITY_TIMER = 10 # nb of seconds before cleanup in case cleanup option is set
+# CLEANUP_FLAG = True
+# CLEANUP_ACTIVITY_TIMER = 10 # nb of seconds before cleanup in case cleanup option is set
VENDOR_NAME = "basic_onboard_vendor"
MODEL_YAML_TEMPLATE = None
-from .settings import *
+from .settings import * # noqa
CLEANUP_FLAG = True
import onaptests.utils.exceptions as onap_test_exceptions
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
CLEANUP_FLAG = True
TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
cloud = openstack.connect(cloud=TEST_CLOUD)
-VIM_USERNAME = cloud.config.auth.get('username','Fill me')
-VIM_PASSWORD = cloud.config.auth.get('password','Fill me')
-VIM_SERVICE_URL = cloud.config.auth.get('auth_url','Fill me')
-TENANT_ID = cloud.config.auth.get('project_id','Fill me')
-TENANT_NAME = cloud.config.auth.get('project_name','Fill me')
-CLOUD_REGION_ID = cloud.config.auth.get('region_name','RegionOne')
-CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name','Default')
+VIM_USERNAME = cloud.config.auth.get('username', 'Fill me')
+VIM_PASSWORD = cloud.config.auth.get('password', 'Fill me')
+VIM_SERVICE_URL = cloud.config.auth.get('auth_url', 'Fill me')
+TENANT_ID = cloud.config.auth.get('project_id', 'Fill me')
+TENANT_NAME = cloud.config.auth.get('project_name', 'Fill me')
+CLOUD_REGION_ID = cloud.config.auth.get('region_name', 'RegionOne')
+CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name', 'Default')
OWNING_ENTITY = "basicvm-oe"
PROJECT = "basicvm-project"
LINE_OF_BUSINESS = "basicvm-lob"
PLATFORM = "basicvm-platform"
CLOUD_DOMAIN = "Default"
-SERVICE_YAML_TEMPLATE = Path(get_resource_location("templates/vnf-services/basic_vm_macro-service.yaml"))
+SERVICE_YAML_TEMPLATE = Path(get_resource_location(
+ "templates/vnf-services/basic_vm_macro-service.yaml"))
try:
# Try to retrieve the SERVICE NAME from the yaml file
-from .basic_vm_macro_settings import * # pylint: disable=W0614
+from .basic_vm_macro_settings import * # noqa
-SERVICE_YAML_TEMPLATE = Path(get_resource_location("templates/vnf-services/basic_vm_macro_stability-service.yaml"))
+from pathlib import Path
+
+from onaptests.utils.resources import get_resource_location
+
+
+SERVICE_YAML_TEMPLATE = Path(get_resource_location(
+ "templates/vnf-services/basic_vm_macro_stability-service.yaml"))
MODEL_YAML_TEMPLATE = None
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Specific Basic VM with multicloud and yaml config scenario."""
SERVICE_DETAILS = ("Onboarding, distribution and instantiation of a VM" +
"using à la carte and Multicloud module")
-SERVICE_COMPONENTS="SDC, DMAAP, AAI, SO, SDNC, Multicloud"
+SERVICE_COMPONENTS = "SDC, DMAAP, AAI, SO, SDNC, Multicloud"
USE_MULTICLOUD = True
# Set ONLY_INSTANTIATE to true to run an instantiation without repeating
# onboarding and related AAI configuration (Cloud config)
-ONLY_INSTANTIATE= False
+ONLY_INSTANTIATE = False
VENDOR_NAME = "sdktests_vendor"
-SERVICE_NAME = "basicvmtest" # must be the same as in YAML
+SERVICE_NAME = "basicvmtest" # must be the same as in YAML
-CLOUD_REGION_CLOUD_OWNER = "sdktestsOwner" # must not contain _
-CLOUD_REGION_ID = "RegionOne" # should be valid, as otherwise MultiCloud fails
+CLOUD_REGION_CLOUD_OWNER = "sdktestsOwner" # must not contain _
+CLOUD_REGION_ID = "RegionOne" # should be valid, as otherwise MultiCloud fails
CLOUD_REGION_TYPE = "openstack"
CLOUD_OWNER_DEFINED_TYPE = "N/A"
CLOUD_REGION_VERSION = "titanium_cloud"
COMPLEX_DATA_CENTER_CODE = "sdktests_complex_data_center_code"
GLOBAL_CUSTOMER_ID = "sdktests_global_customer_id"
-TENANT_ID = "" # Fill me in your custom settings
-TENANT_NAME= "" # Fill me in your custom settings
-AVAILABILITY_ZONE_NAME = "" # Fill me in your custom settings
+TENANT_ID = "" # Fill me in your custom settings
+TENANT_NAME = "" # Fill me in your custom settings
+AVAILABILITY_ZONE_NAME = "" # Fill me in your custom settings
AVAILABILITY_ZONE_TYPE = "nova"
VIM_USERNAME = "" # Fill me in your custom settings
from yaml import load, SafeLoader
from onaptests.utils.resources import get_resource_location
import onaptests.utils.exceptions as onap_test_exceptions
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Specific basic_vm without multicloud."""
# pylint: disable=bad-whitespace
# The ONAP part
-SERVICE_DETAILS="Onboarding, distribution and instanitation of an Ubuntu VM using à la carte"
-SERVICE_COMPONENTS="SDC, DMAAP, AAI, SO, SDNC"
+SERVICE_DETAILS = "Onboarding, distribution and instanitation of an Ubuntu VM using à la carte"
+SERVICE_COMPONENTS = "SDC, DMAAP, AAI, SO, SDNC"
USE_MULTICLOUD = False
# Set ONLY_INSTANTIATE to true to run an instantiation without repeating
# onboarding and related AAI configuration (Cloud config)
-ONLY_INSTANTIATE= False
+ONLY_INSTANTIATE = False
# if a yaml file is define, retrieve info from this yaml files
# if not declare the parameters in the settings
TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
cloud = openstack.connect(cloud=TEST_CLOUD)
-VIM_USERNAME = cloud.config.auth.get('username','Fill me')
-VIM_PASSWORD = cloud.config.auth.get('password','Fill me')
-VIM_SERVICE_URL = cloud.config.auth.get('auth_url','Fill me')
-TENANT_ID = cloud.config.auth.get('project_id','Fill me')
-TENANT_NAME = cloud.config.auth.get('project_name','Fill me')
-CLOUD_REGION_ID = cloud.config.auth.get('region_name','RegionOne')
-CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name','Default')
+VIM_USERNAME = cloud.config.auth.get('username', 'Fill me')
+VIM_PASSWORD = cloud.config.auth.get('password', 'Fill me')
+VIM_SERVICE_URL = cloud.config.auth.get('auth_url', 'Fill me')
+TENANT_ID = cloud.config.auth.get('project_id', 'Fill me')
+TENANT_NAME = cloud.config.auth.get('project_name', 'Fill me')
+CLOUD_REGION_ID = cloud.config.auth.get('region_name', 'RegionOne')
+CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name', 'Default')
MODEL_YAML_TEMPLATE = None
from pathlib import Path
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
SERVICE_NAME = "CDS blueprint enrichment"
from uuid import uuid4
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
CLEANUP_FLAG = True
SERVICE_NAME = "CDS resource resolution"
-CLOUD_REGION_CLOUD_OWNER = "basicnf-owner" # must not contain _
+CLOUD_REGION_CLOUD_OWNER = "basicnf-owner" # must not contain _
CLOUD_REGION_ID = "k8sregion-cds"
CLOUD_REGION_TYPE = "k8s"
CLOUD_REGION_VERSION = "1.0"
CLOUD_OWNER_DEFINED_TYPE = "N/A"
COMPLEX_PHYSICAL_LOCATION_ID = "sdktests"
-MSB_K8S_DEFINITION_ATRIFACT_FILE_PATH = Path(get_resource_location(
- "templates/artifacts/cds-resource-resolution/cds-mock-server.tar.gz"))
+MSB_K8S_DEFINITION_ATRIFACT_FILE_PATH = Path(
+ get_resource_location(
+ "templates/artifacts/cds-resource-resolution/cds-mock-server.tar.gz"))
MSB_K8S_RB_NAME = f"cds-ms-rb-{str(uuid4())}"
MSB_K8S_RB_VERSION = "v1"
MSB_K8S_PROFILE_ARTIFACT_FILE_PATH = Path(get_resource_location(
}
]
-CDS_DD_FILE = Path(get_resource_location("templates/artifacts/cds-resource-resolution/dd.json"))
-CDS_CBA_UNENRICHED = Path(get_resource_location("templates/artifacts/cds-resource-resolution/resource-resolution.zip"))
+CDS_DD_FILE = Path(get_resource_location(
+ "templates/artifacts/cds-resource-resolution/dd.json"))
+CDS_CBA_UNENRICHED = Path(get_resource_location(
+ "templates/artifacts/cds-resource-resolution/resource-resolution.zip"))
CDS_CBA_ENRICHED = Path("/tmp/resource-resolution-enriched.zip")
CDS_WORKFLOW_NAME = "resource-resolution"
CDS_WORKFLOW_INPUT = {
"j_input": "ok"
}
}
-CDS_WORKFLOW_EXPECTED_OUTPUT = {
+CDS_WORKFLOW_EXPECTED_OUTPUT = {
"resource-resolution-response": {
"meshed-template": {
- "helloworld-velocity": "{\n \"default\": \"ok\",\n \"input\": \"ok\",\n \"script\": {\n \"python\": \"ok\",\n \"kotlin\": \"ok\"\n },\n \"db\": \"ok\",\n \"rest\": {\n \"GET\": \"A046E51D-44DC-43AE-BBA2-86FCA86C5265\",\n \"POST\": \"post:ok\",\n \"PUT\": \"put:ok\",\n \"PATCH\": \"patch:ok\",\n \"DELETE\": \"delete:ok\"\n }\n}\n",
- "helloworld-jinja": "{\n \"default\": \"ok\",\n \"input\": \"ok\",\n \"script\": {\n \"python\": \"ok\",\n \"kotlin\": {\n \"base\": \"ok\"\n \"from suspend function\": \"ok\"\n }\n },\n \"db\": \"ok\",\n \"rest\": {\n \"GET\": \"A046E51D-44DC-43AE-BBA2-86FCA86C5265\",\n \"GET_ID\": \"74FE67C6-50F5-4557-B717-030D79903908\",\n \"POST\": \"post:ok\",\n \"PUT\": \"put:ok\",\n \"PATCH\": \"patch:ok\",\n \"DELETE\": \"delete:ok\"\n }\n}\n"
+ "helloworld-velocity": "{\n \"default\": \"ok\",\n \"input\": \"ok\",\n \"script\": {\n \"python\": \"ok\",\n \"kotlin\": \"ok\"\n },\n \"db\": \"ok\",\n \"rest\": {\n \"GET\": \"A046E51D-44DC-43AE-BBA2-86FCA86C5265\",\n \"POST\": \"post:ok\",\n \"PUT\": \"put:ok\",\n \"PATCH\": \"patch:ok\",\n \"DELETE\": \"delete:ok\"\n }\n}\n", # noqa
+ "helloworld-jinja": "{\n \"default\": \"ok\",\n \"input\": \"ok\",\n \"script\": {\n \"python\": \"ok\",\n \"kotlin\": {\n \"base\": \"ok\"\n \"from suspend function\": \"ok\"\n }\n },\n \"db\": \"ok\",\n \"rest\": {\n \"GET\": \"A046E51D-44DC-43AE-BBA2-86FCA86C5265\",\n \"GET_ID\": \"74FE67C6-50F5-4557-B717-030D79903908\",\n \"POST\": \"post:ok\",\n \"PUT\": \"put:ok\",\n \"PATCH\": \"patch:ok\",\n \"DELETE\": \"delete:ok\"\n }\n}\n" # noqa
}
}
}
import openstack
from yaml import load, SafeLoader
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Specific clearwater IMS without multicloud."""
USE_MULTICLOUD = False
# Set ONLY_INSTANTIATE to true to run an instantiation without repeating
# onboarding and related AAI configuration (Cloud config)
-ONLY_INSTANTIATE= False
+ONLY_INSTANTIATE = False
CLEANUP_FLAG = True
CLEANUP_ACTIVITY_TIMER = 60 # nb of seconds before cleanup in case cleanup option is set
VENDOR_NAME = "clearwater-ims_vendor"
yaml_config_file = load(yaml_template, SafeLoader)
SERVICE_NAME = next(iter(yaml_config_file.keys()))
except ValueError:
- SERVICE_NAME = "" # Fill me
+ SERVICE_NAME = "" # Fill me
CLOUD_REGION_CLOUD_OWNER = "clearwater-ims-cloud-owner"
CLOUD_REGION_TYPE = "openstack"
# to retrieve cloud info and avoid data duplication
TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
cloud = openstack.connect(cloud=TEST_CLOUD)
-VIM_USERNAME = cloud.config.auth.get('username','Fill me')
-VIM_PASSWORD = cloud.config.auth.get('password','Fill me')
-VIM_SERVICE_URL = cloud.config.auth.get('auth_url','Fill me')
-TENANT_ID = cloud.config.auth.get('project_id','Fill me')
-TENANT_NAME = cloud.config.auth.get('project_name','Fill me')
-CLOUD_REGION_ID = cloud.config.get('region_name','RegionOne')
-CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name','Default')
+VIM_USERNAME = cloud.config.auth.get('username', 'Fill me')
+VIM_PASSWORD = cloud.config.auth.get('password', 'Fill me')
+VIM_SERVICE_URL = cloud.config.auth.get('auth_url', 'Fill me')
+TENANT_ID = cloud.config.auth.get('project_id', 'Fill me')
+TENANT_NAME = cloud.config.auth.get('project_name', 'Fill me')
+CLOUD_REGION_ID = cloud.config.get('region_name', 'RegionOne')
+CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name', 'Default')
MODEL_YAML_TEMPLATE = None
import openstack
from jinja2 import Environment, PackageLoader
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
VNF_FILENAME_PREFIX = "multi-vnf-ubuntu"
SERVICE_NAME = f"multivnfubuntu{str(uuid.uuid4().hex)[:6]}"
from uuid import uuid4
from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
ONLY_INSTANTIATE = False
CLEANUP_FLAG = True
CDS_CBA_UNENRICHED = Path(get_resource_location("templates/artifacts/PNF_DEMO.zip"))
CDS_CBA_ENRICHED = "/tmp/PNF_DEMO_enriched.zip"
-CLOUD_REGION_CLOUD_OWNER = "basicnf-owner" # must not contain _
+CLOUD_REGION_CLOUD_OWNER = "basicnf-owner" # must not contain _
CLOUD_REGION_ID = "k8sregion-pnf-macro"
CLOUD_REGION_TYPE = "k8s"
CLOUD_REGION_VERSION = "1.0"
INSTANTIATION_TIMEOUT = 600
-MSB_K8S_DEFINITION_ATRIFACT_FILE_PATH = Path(get_resource_location("templates/artifacts/pnf-simulator.tar.gz"))
+MSB_K8S_DEFINITION_ATRIFACT_FILE_PATH = Path(get_resource_location(
+ "templates/artifacts/pnf-simulator.tar.gz"))
MSB_K8S_RB_NAME = f"pnf-cnf-rb-{str(uuid4())}"
MSB_K8S_RB_VERSION = "v1"
-MSB_K8S_PROFILE_ARTIFACT_FILE_PATH = Path(get_resource_location("templates/artifacts/profile.tar.gz"))
+MSB_K8S_PROFILE_ARTIFACT_FILE_PATH = Path(get_resource_location(
+ "templates/artifacts/profile.tar.gz"))
MSB_K8S_PROFILE_NAME = f"pnf-cnf-profile-{str(uuid4())}"
K8S_VERSION = "1.0"
K8S_CONFIG = get_resource_location("templates/artifacts/config")
K8S_CONFIG = None # None means it will use default config (~/.kube/config)
K8S_ONAP_NAMESPACE = "onap" # ONAP Kubernetes namespace
K8S_ADDITIONAL_RESOURCES_NAMESPACE = K8S_ONAP_NAMESPACE # Resources created on tests namespace
-#SOCK_HTTP = "socks5h://127.0.0.1:8091"
+# SOCK_HTTP = "socks5h://127.0.0.1:8091"
ORCHESTRATION_REQUEST_TIMEOUT = 60.0 * 15 # 15 minutes in seconds
SERVICE_DISTRIBUTION_NUMBER_OF_TRIES = 30
SERVICE_DISTRIBUTION_SLEEP_TIME = 60
EXPOSE_SERVICES_NODE_PORTS = True
-IN_CLUSTER = False
\ No newline at end of file
+IN_CLUSTER = False
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
""" Specific Status Check """
SERVICE_NAME = "Status Check"
}
SPECIFIC_LOGS_CONTAINERS = {
- 'sdc-be': ['/var/log/onap/sdc/sdc-be/error.log'],
'sdc-onboarding-be': ['/var/log/onap/sdc/sdc-onboarding-be/error.log'],
'aaf-cm': [
'/opt/app/osaaf/logs/cm/cm-service.log',
MAX_LOG_BYTES = 512000
-UNLIMITED_LOG_BYTES = 10**10 # 10 GB
+UNLIMITED_LOG_BYTES = 10**10 # 10 GB
class BasicClamp(ScenarioBase):
"""Onboard, update a model with a loop, design the loop and deploy it."""
__logger = logging.getLogger(__name__)
+
def __init__(self, **kwargs):
"""Init Basic Clamp, onboard a VM, design and deploy a loop with CLAMP."""
super().__init__('basic_clamp', **kwargs)
self.test = ClampStep(
- cleanup=settings.CLEANUP_FLAG)
+ cleanup=settings.CLEANUP_FLAG)
self.start_time = None
self.stop_time = None
self.result = 0
"""Init BasicCnf."""
super().__init__('basic_cnf', **kwargs)
self.test = YamlTemplateVfModuleAlaCarteInstantiateStep(
- cleanup=settings.CLEANUP_FLAG)
+ cleanup=settings.CLEANUP_FLAG)
self.start_time = None
self.stop_time = None
self.result = 0
# import basic_network_nomulticloud_settings needed
super().__init__('basic_network', **kwargs)
self.test = YamlTemplateVlAlaCarteInstantiateStep(
- cleanup=settings.CLEANUP_FLAG)
+ cleanup=settings.CLEANUP_FLAG)
self.start_time = None
self.stop_time = None
self.result = 0
"""Init BasicVM."""
super().__init__('basic_vm', **kwargs)
self.test = YamlTemplateVfModuleAlaCarteInstantiateStep(
- cleanup=settings.CLEANUP_FLAG)
+ cleanup=settings.CLEANUP_FLAG)
self.start_time = None
self.stop_time = None
self.result = 0
"""Init CDS blueprint enrichment use case."""
super().__init__('basic_cds', **kwargs)
self.test = CbaEnrichStep(
- cleanup=settings.CLEANUP_FLAG)
+ cleanup=settings.CLEANUP_FLAG)
self.start_time = None
self.stop_time = None
self.result = 0
"""Init CDS resource resolution use case."""
super().__init__('basic_cds', **kwargs)
self.test = CDSResourceResolutionStep(
- cleanup=settings.CLEANUP_FLAG)
+ cleanup=settings.CLEANUP_FLAG)
self.start_time = None
self.stop_time = None
self.result = 0
# import clearwater_ims_nomulticloud_settings needed
super().__init__('clearwater_ims', **kwargs)
self.test = YamlTemplateVfModuleAlaCarteInstantiateStep(
- cleanup=settings.CLEANUP_FLAG)
+ cleanup=settings.CLEANUP_FLAG)
self.start_time = None
self.stop_time = None
self.result = 0
"""Clean Additional resources if needed."""
try:
self.test.reports_collection.generate_report()
- except:
+ except: # noqa
self.__logger.error("Impossible to generate reporting")
self.start_time = time.time()
try:
self.test.execute()
- self.__logger.info("Starting to clean up in {} seconds".format(settings.CLEANUP_ACTIVITY_TIMER))
+ self.__logger.info("Starting to clean up in {} seconds".format(
+ settings.CLEANUP_ACTIVITY_TIMER))
time.sleep(settings.CLEANUP_ACTIVITY_TIMER)
self.test.cleanup()
self.result = 100
"""Init the testcase."""
super().__init__('status', **kwargs)
self.test = CheckNamespaceStatusStep(
- cleanup=settings.CLEANUP_FLAG)
+ cleanup=settings.CLEANUP_FLAG)
self.start_time = None
self.stop_time = None
self.result = 0
def store_state(cls, fun=None, *, cleanup=False):
if fun is None:
return functools.partial(cls.store_state, cleanup=cleanup)
+
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
if cleanup:
self._start_cleanup_time = time.time()
self._logger.info("*****************************************************")
- self._logger.info(f"START [{self.component}] {self.name} cleanup: {self.description}")
+ self._logger.info(f"START [{self.component}] {self.name} cleanup: "
+ f"{self.description}")
self._logger.info("*****************************************************")
else:
self._logger.info("*****************************************************")
execution_status = ReportStepStatus.PASS
return ret
except SubstepExecutionException:
- execution_status = ReportStepStatus.PASS if cleanup else ReportStepStatus.NOT_EXECUTED
+ execution_status = (ReportStepStatus.PASS if cleanup else
+ ReportStepStatus.NOT_EXECUTED)
raise
except (OnapTestException, SDKException):
execution_status = ReportStepStatus.FAIL
finally:
if cleanup:
self._logger.info("*****************************************************")
- self._logger.info(f"STOP [{self.component}] {self.name} cleanup: {self.description}")
+ self._logger.info(f"STOP [{self.component}] {self.name} cleanup: "
+ f"{self.description}")
self._logger.info("*****************************************************")
self._cleanup_report = Report(
- step_description=f"[{self.component}] {self.name} cleanup: {self.description}",
+ step_description=(f"[{self.component}] {self.name} cleanup: "
+ f"{self.description}"),
step_execution_status=execution_status,
step_execution_duration=time.time() - self._start_cleanup_time,
step_component=self.component
self._logger.info("*****************************************************")
if not self._start_execution_time:
if execution_status != ReportStepStatus.NOT_EXECUTED:
- self._logger.error("No execution start time saved for %s step. Fix it by call `super.execute()` "
- "in step class `execute()` method definition", self.name)
+ self._logger.error("No execution start time saved for %s step. "
+ "Fix it by call `super.execute()` "
+ "in step class `execute()` method definition",
+ self.name)
self._start_execution_time = time.time()
self._execution_report = Report(
step_description=f"[{self.component}] {self.name}: {self.description}",
- step_execution_status=execution_status if execution_status else ReportStepStatus.FAIL,
+ step_execution_status=(execution_status if execution_status else
+ ReportStepStatus.FAIL),
step_execution_duration=time.time() - self._start_execution_time,
step_component=self.component
)
self.all_resources = []
self.failing_resources = []
self.jinja_env = Environment(autoescape=select_autoescape(['html']),
- loader=PackageLoader('onaptests.templates','status'))
+ loader=PackageLoader('onaptests.templates', 'status'))
@property
def component(self) -> str:
def _add_failing_resource(self, resource):
if (resource.labels and settings.EXCLUDED_LABELS
- and (resource.labels.keys() and settings.EXCLUDED_LABELS.keys())):
+ and (resource.labels.keys() and settings.EXCLUDED_LABELS.keys())):
for label in resource.labels.items():
for waived_label in settings.EXCLUDED_LABELS.items():
if label[0] in waived_label[0] and label[1] in waived_label[1]:
try:
self._init_resources()
if len(self.k8s_resources) > 0:
- self.__logger.info("%4s %ss in the namespace", len(self.k8s_resources), self.resource_type)
+ self.__logger.info("%4s %ss in the namespace",
+ len(self.k8s_resources),
+ self.resource_type)
self._parse_resources()
- self.__logger.info("%4s %ss parsed, %s failing", len(self.all_resources), self.resource_type,
- len(self.failing_resources))
+ self.__logger.info("%4s %ss parsed, %s failing",
+ len(self.all_resources),
+ self.resource_type,
+ len(self.failing_resources))
except (ConnectionRefusedError, MaxRetryError, NewConnectionError):
self.__logger.error("Test of k8s %ss failed.", self.resource_type)
self.__logger.error("Cannot connect to Kubernetes.")
+
class CheckBasicK8sResourcesStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, resource_type: str, k8s_res_class, cleanup: bool = False,**kwargs):
+ def __init__(self, resource_type: str, k8s_res_class, cleanup: bool = False, **kwargs):
"""Init CheckBasicK8sResourcesStep."""
super().__init__(resource_type)
self.k8s_res_class = k8s_res_class
def execute(self):
super().execute()
+
class CheckK8sConfigMapsStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sConfigMapsStep."""
super().__init__("configmap", ConfigMap)
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_config_map(NAMESPACE).items
+
class CheckK8sSecretsStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sSecretsStep."""
super().__init__("secret", Secret)
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_secret(NAMESPACE).items
+
class CheckK8sIngressesStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sIngressesStep."""
super().__init__("ingress", Ingress)
super()._init_resources()
self.k8s_resources = self.networking.list_namespaced_ingress(NAMESPACE).items
+
class CheckK8sPvcsStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sPvcsStep."""
super().__init__("pvc")
Return a list of Pods that were created to perform jobs.
"""
super()._parse_resources()
- jobs_pods = []
for k8s in self.k8s_resources:
pvc = Pvc(k8s=k8s)
- field_selector = f"involvedObject.name={pvc.name},involvedObject.kind=PersistentVolumeClaim"
+ field_selector = (f"involvedObject.name={pvc.name}, "
+ "involvedObject.kind=PersistentVolumeClaim")
pvc.events = self.core.list_namespaced_event(
NAMESPACE,
field_selector=field_selector).items
def execute(self):
super().execute()
+
class CheckK8sResourcesUsingPodsStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, resource_type: str, pods_source, cleanup: bool = False,**kwargs):
+ def __init__(self, resource_type: str, pods_source, cleanup: bool = False, **kwargs):
"""Init CheckK8sResourcesUsingPodsStep."""
super().__init__(resource_type)
self.pods_source = pods_source
def execute(self):
super().execute()
+
class CheckK8sJobsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sJobsStep."""
super().__init__("job", None)
# timemout job
if not k8s.status.completion_time:
- if any(
- waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
+ if any(waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
self._add_failing_resource(job)
# completed job
if any(waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
self.all_resources.append(job)
jobs_pods += job_pods
+
class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sPodsStep."""
super().__init__("pod", pods)
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_pod(NAMESPACE).items
- def _parse_resources(self):
+ def _parse_resources(self): # noqa
"""Parse the pods."""
super()._parse_resources()
excluded_pods = self._get_used_pods()
pod_component = k8s.metadata.labels[
'app.kubernetes.io/name']
else:
- self.__logger.error("pod %s has no 'app' or 'app.kubernetes.io/name' in metadata: %s", pod_component, k8s.metadata.labels)
+ self.__logger.error("pod %s has no 'app' or 'app.kubernetes.io/name' "
+ "in metadata: %s", pod_component, k8s.metadata.labels)
- ## looks for docker version
+ # looks for docker version
for container in k8s.spec.containers:
pod_version = {}
pod_container_version = container.image.rsplit(":", 1)
search_rule = "^(?P<source>[^/]*)/*(?P<container>[^:]*):*(?P<version>.*)$"
search = re.search(search_rule, container.image)
name = "{}/{}".format(search.group('source'),
- search.group('container'))
+ search.group('container'))
version = search.group('version')
if name[-1] == '/':
name = name[0:-1]
)
return logs
- def _parse_container(self, pod, k8s_container, init=False):
+ def _parse_container(self, pod, k8s_container, init=False): # noqa
"""Get the logs of a container."""
logs = ""
old_logs = ""
logs = self._get_container_logs(pod=pod, container=container, full=False)
with open(
"{}/pod-{}-{}.log".format(self.res_dir,
- pod.name, container.name),
+ pod.name, container.name),
'w') as log_result:
log_result.write(logs)
if (not container.ready) and container.restart_count > 0:
previous=True)
with open(
"{}/pod-{}-{}.old.log".format(self.res_dir,
- pod.name,
- container.name),
+ pod.name,
+ container.name),
'w') as log_result:
log_result.write(old_logs)
if (container.name in settings.FULL_LOGS_CONTAINERS):
logs = self._get_container_logs(pod=pod, container=container)
with open(
"{}/pod-{}-{}.log".format(self.res_dir,
- pod.name, container.name),
+ pod.name, container.name),
'w') as log_result:
log_result.write(logs)
if (container.name in settings.SPECIFIC_LOGS_CONTAINERS):
log_result.write(log_files[log_file])
except client.rest.ApiException as exc:
self.__logger.warning("%scontainer %s of pod %s has an exception: %s",
- prefix, container.name, pod.name, exc.reason)
+ prefix, container.name, pod.name, exc.reason)
self.jinja_env.get_template('container_log.html.j2').stream(
container=container,
pod_name=pod.name,
return 1
return 0
+
class CheckK8sServicesStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sServicesStep."""
super().__init__("service", pods)
self.res_dir, service.name))
self.all_resources.append(service)
+
class CheckK8sDeploymentsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sDeploymentsStep."""
super().__init__("deployment", pods)
self.all_resources.append(deployment)
+
class CheckK8sResplicaSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sResplicaSetsStep."""
super().__init__("replicaset", pods)
replicaset=replicaset).dump('{}/replicaset-{}.html'.format(
self.res_dir, replicaset.name))
- if (not k8s.status.ready_replicas
- or (k8s.status.ready_replicas < k8s.status.replicas)):
+ if (not k8s.status.ready_replicas or
+ (k8s.status.ready_replicas < k8s.status.replicas)):
self._add_failing_resource(replicaset)
self.all_resources.append(replicaset)
+
class CheckK8sStatefulSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sStatefulSetsStep."""
super().__init__("statefulset", pods)
self.all_resources.append(statefulset)
+
class CheckK8sDaemonSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sDaemonSetsStep."""
super().__init__("daemonset", pods)
self.all_resources.append(daemonset)
+
class CheckNamespaceStatusStep(CheckK8sResourcesStep):
"""Check status of all k8s resources in the selected namespace."""
for step in self._steps:
if step.failing:
self.failing = True
- self.__logger.info("%s failing: %s", step.resource_type, len(step.failing_resources))
+ self.__logger.info("%s failing: %s",
+ step.resource_type,
+ len(step.failing_resources))
details[step.resource_type] = {
'number_all': len(step.all_resources),
'number_failing': len(step.failing_resources),
super().execute()
customer: Customer = Customer.get_by_global_customer_id(
settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(
- settings.SERVICE_NAME)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(settings.SERVICE_NAME)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
# for which we are sure that an availability zone has been created
tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
- service_subscription.link_to_cloud_region_and_tenant(cloud_region=cloud_region, tenant=tenant)
+ service_subscription.link_to_cloud_region_and_tenant(
+ cloud_region=cloud_region,
+ tenant=tenant)
from onapsdk.configuration import settings
from onapsdk.exceptions import APIError
from onapsdk.k8s import ConnectivityInfo
-from onapsdk.utils.jinja import jinja_env
from onaptests.steps.base import BaseStep
- K8S_CONFIG.
"""
super().execute()
- ######## Create Connectivity Info #########################################
+ # Create Connectivity Info #########################################
try:
self._logger.info("Check if k8s connectivity information exists")
ConnectivityInfo.get_connectivity_info_by_region_id(
return "AAI"
@BaseStep.store_state
- def execute(self):
+ def execute(self): # noqa
"""Register cloud region.
Use settings values:
else:
return False
+
class K8sPodParentResource(K8sResource):
"""K8sPodParentResource class."""
class Deployment(K8sPodParentResource):
"""Deployment class."""
+
class ReplicaSet(K8sPodParentResource):
"""ReplicaSet class."""
+
class StatefulSet(K8sPodParentResource):
"""StatefulSet class."""
class Ingress(K8sResource):
- """Ingress class."""
\ No newline at end of file
+ """Ingress class."""
from ..base import BaseStep
from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
+
class K8SProfileStep(BaseStep):
"""CreateK8sProfileStep."""
self._logger.info("Create the k8s profile if it doesn't exist")
super().execute()
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
for vnf_instance in self._service_instance.vnf_instances:
# possible to have several modules for 1 VNF
self._logger.error("Missing rb profile information")
raise onap_test_exceptions.ProfileInformationException
- ######## Check profile for Definition ###################################
+ # Check profile for Definition
try:
rbdef.get_profile_by_name(k8s_profile_name)
except ResourceNotFound:
- ######## Create profile for Definition ###################################
+ # Create profile for Definition
profile = rbdef.create_profile(k8s_profile_name,
k8s_profile_namespace,
settings.K8S_PROFILE_K8S_VERSION)
- ####### Upload artifact for created profile ##############################
+ # Upload artifact for created profile
profile.upload_artifact(open(settings.K8S_PROFILE_ARTIFACT_PATH, 'rb').read())
@BaseStep.store_state(cleanup=True)
if k8s_profile_name == "":
self._logger.error("K8s profile deletion failed, missing rb profile name")
raise onap_test_exceptions.ProfileInformationException
- ######## Delete profile for Definition ###################################
+ # Delete profile for Definition
try:
profile = rbdef.get_profile_by_name(k8s_profile_name)
profile.delete()
super().execute()
service = Service(self.service_name)
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(service.name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(service.name)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
distribution_completed = service.distributed
if distribution_completed is True:
self._logger.info(
- "Service Distribution for %s is sucessfully finished",
- service.name)
+ "Service Distribution for %s is sucessfully finished",
+ service.name)
break
self._logger.info(
"Service Distribution for %s ongoing, Wait for 60 s",
if distribution_completed is False:
self._logger.error(
- "Service Distribution for %s failed !!",service.name)
+ "Service Distribution for %s failed !!", service.name)
raise onap_test_exceptions.ServiceDistributionException
service_instantiation = ServiceInstantiation.instantiate_ala_carte(
self._logger.error("Service instantiation %s failed", self.service_instance_name)
raise onap_test_exceptions.ServiceInstantiateException
else:
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
@YamlTemplateBaseStep.store_state(cleanup=True)
def cleanup(self) -> None:
from onapsdk.configuration import settings
from onapsdk.exceptions import ResourceNotFound
from onapsdk.sdc.service import Service
-from onapsdk.so.instantiation import InstantiationParameter, ServiceInstantiation, VfmoduleParameters, VnfParameters, SoService
-from onaptests.steps.cloud.customer_service_subscription_create import CustomerServiceSubscriptionCreateStep
+from onapsdk.so.instantiation import (
+ InstantiationParameter,
+ ServiceInstantiation,
+ VfmoduleParameters,
+ VnfParameters,
+ SoService
+)
+from onaptests.steps.cloud.customer_service_subscription_create import (
+ CustomerServiceSubscriptionCreateStep
+)
import onaptests.utils.exceptions as onap_test_exceptions
from onaptests.steps.base import YamlTemplateBaseStep
from onaptests.steps.onboard.service import YamlTemplateServiceOnboardStep
-from onaptests.steps.cloud.connect_service_subscription_to_cloud_region import ConnectServiceSubToCloudRegionStep
+from onaptests.steps.cloud.connect_service_subscription_to_cloud_region import (
+ ConnectServiceSubToCloudRegionStep
+)
class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
else: # only pnfs
self.add_step(CustomerServiceSubscriptionCreateStep(cleanup))
-
@property
def description(self) -> str:
"""Step description."""
return self.parent.service_instance_name
@YamlTemplateBaseStep.store_state
- def execute(self):
+ def execute(self): # noqa
"""Instantiate service.
Use settings values:
super().execute()
service = Service(self.service_name)
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(service.name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(service.name)
if any(
filter(lambda x: x in self.yaml_template[self.service_name].keys(),
["vnfs", "networks"])):
)
tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
else:
- cloud_region, tenant = None, None # Only PNF is going to be instantiated so
- # neither cloud_region nor tenant are needed
+ # Only PNF is going to be instantiated so
+ # neither cloud_region nor tenant are needed
+ cloud_region, tenant = None, None
try:
owning_entity = OwningEntity.get_by_owning_entity_name(
settings.OWNING_ENTITY)
distribution_completed = service.distributed
if distribution_completed is True:
self._logger.info(
- "Service Distribution for %s is sucessfully finished",
- service.name)
+ "Service Distribution for %s is sucessfully finished",
+ service.name)
break
self._logger.info(
"Service Distribution for %s ongoing, Wait for %d s",
- service.name,settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
+ service.name, settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
time.sleep(settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
nb_try += 1
if settings.MODEL_YAML_TEMPLATE:
so_data = self.yaml_template[self.service_name]
so_service = SoService(vnfs=so_data.get("vnfs", []),
- subscription_service_type=so_data.get('subscription_service_type'))
+ subscription_service_type=so_data.get(
+ 'subscription_service_type'))
else:
for vnf_data in self.yaml_template[self.service_name].get("vnfs", []):
vnf_params_list.append(VnfParameters(
vnf_data["vnf_name"],
- [InstantiationParameter(name=parameter["name"], value=parameter["value"]) for parameter in
+ [InstantiationParameter(name=parameter["name"],
+ value=parameter["value"]) for parameter in
vnf_data.get("vnf_parameters", [])],
[VfmoduleParameters(vf_module_data["vf_module_name"],
- [InstantiationParameter(name=parameter["name"], value=parameter["value"]) for
- parameter in vf_module_data.get("parameters", [])]) \
+ [InstantiationParameter(name=parameter["name"],
+ value=parameter["value"]) for
+ parameter in vf_module_data.get("parameters", [])])
for vf_module_data in vnf_data.get("vf_module_parameters", [])]
))
self._logger.error("Service instantiation %s failed", self.service_instance_name)
raise onap_test_exceptions.ServiceInstantiateException
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
@YamlTemplateBaseStep.store_state(cleanup=True)
def cleanup(self) -> None:
from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
from .k8s_profile_create import K8SProfileStep
+
class YamlTemplateVfModuleAlaCarteInstantiateStep(YamlTemplateBaseStep):
"""Instantiate vf module a'la carte using YAML template."""
"""
super().execute()
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
"""
# workaround, as Network name differs from model name (added " 0")
- network_name=re.sub(r"\s\d$", r"", network_name)
+ network_name = re.sub(r"\s\d$", r"", network_name)
for net in self.yaml_template[self.service_name]["networks"]:
if net["vl_name"] == network_name:
if net['subnets'] is None:
super().execute()
service: Service = Service(self.service_name)
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
self._service_instance = service_instance
for idx, network in enumerate(service.networks):
- #for network in self.yaml_template[self.service_name]["networks"]:
+ # for network in self.yaml_template[self.service_name]["networks"]:
net_instantiation = service_instance.add_network(
network,
settings.LINE_OF_BUSINESS,
"""
if self._cleanup:
for net_instance in self._service_instance.network_instances:
- self._logger.info("Start network deletion %s",net_instance.name)
+ self._logger.info("Start network deletion %s", net_instance.name)
net_deletion = net_instance.delete(a_la_carte=True)
try:
net_deletion.wait_for_finish(settings.ORCHESTRATION_REQUEST_TIMEOUT)
super().execute()
service: Service = Service(self.service_name)
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
else:
return self.parent.service_name
-
def check(self, operational_policies: list, is_template: bool = False) -> str:
"""Check CLAMP requirements to create a loop."""
self._logger.info("Check operational policy")
for policy in operational_policies:
exist = Clamp.check_policies(policy_name=policy["name"],
- req_policies=30)# 30 required policy
+ req_policies=30) # 30 required policy
self._logger.info("Operational policy found.")
if not exist:
raise ValueError("Couldn't load the policy %s", policy)
def loop_counter(self, action: str) -> None:
""" Count number of loop instances."""
- if action == "plus":
+ if action == "plus":
self.count += 1
- if action == "minus":
+ if action == "minus":
self.count -= 1
@YamlTemplateBaseStep.store_state
def execute(self):
- super().execute() # TODO work only the 1st time, not if already onboarded
+ super().execute() # TODO work only the 1st time, not if already onboarded
# Before instantiating, be sure that the service has been distributed
service = Service(self.service_name)
distribution_completed = service.distributed
if distribution_completed is True:
self._logger.info(
- "Service Distribution for %s is sucessfully finished",
- service.name)
+ "Service Distribution for %s is sucessfully finished",
+ service.name)
break
self._logger.info(
"Service Distribution for %s ongoing, Wait for 60 s",
if distribution_completed is False:
self._logger.error(
- "Service Distribution for %s failed !!",service.name)
+ "Service Distribution for %s failed !!", service.name)
raise onap_test_exceptions.ServiceDistributionException
# time to wait for template load in CLAMP
"""class instantiating a closed loop in clamp."""
def __init__(self, template: str, loop_name: str, operational_policies: list):
- self.template=template
- self.loop_name=loop_name
- self.operational_policies=operational_policies
+ self.template = template
+ self.loop_name = loop_name
+ self.operational_policies = operational_policies
self._logger: logging.Logger = logging.getLogger("")
logging.config.dictConfig(settings.LOG_CONFIG)
self._logger.error("an error occured while adding an operational policy")
self._logger.info("ADD OPERATION SUCCESSFULY DONE")
-
def configure_policies(self, loop: LoopInstance) -> None:
"""Configure all policies."""
self._logger.info("******** UPDATE MICROSERVICE POLICY *******")
loop.update_microservice_policy()
self._logger.info("******** UPDATE OPERATIONAL POLICIES CONFIG *******")
for policy in self.operational_policies:
- #loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
- #possible configurations for the moment
+ # loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
+ # possible configurations for the moment
if policy["name"] == "MinMax":
loop.add_op_policy_config(loop.add_minmax_config)
if policy["name"] == "Drools":
self._logger.info("******** SUBMIT POLICIES TO PE *******")
submit = loop.act_on_loop_policy(loop.submit)
self._logger.info("******** CHECK POLICIES SUBMITION *******")
- if submit :
+ if submit:
self._logger.info("Policies successfully submited to PE")
else:
from ..base import YamlTemplateBaseStep
from .service import YamlTemplateVfOnboardStep
+
class OnboardClampStep(YamlTemplateBaseStep):
"""Onboard class to create CLAMP templates."""
def execute(self) -> None:
"""Create profile."""
super().execute()
- definition: Definition = Definition.get_definition_by_name_version(\
+ definition: Definition = Definition.get_definition_by_name_version(
rb_name=settings.MSB_K8S_RB_NAME,
rb_version=settings.MSB_K8S_RB_VERSION)
with open(settings.MSB_K8S_PROFILE_ARTIFACT_FILE_PATH, "rb") as profile_file:
- self.profile = definition.create_profile(profile_name=settings.MSB_K8S_PROFILE_NAME,
- namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
- kubernetes_version=settings.K8S_VERSION)
+ self.profile = definition.create_profile(
+ profile_name=settings.MSB_K8S_PROFILE_NAME,
+ namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
+ kubernetes_version=settings.K8S_VERSION)
self.profile.upload_artifact(profile_file.read())
"""
super().execute()
- service: Service = Service(name=settings.SERVICE_NAME, instantiation_type=settings.SERVICE_INSTANTIATION_TYPE)
+ service: Service = Service(name=settings.SERVICE_NAME,
+ instantiation_type=settings.SERVICE_INSTANTIATION_TYPE)
if not service.created():
service.create()
if settings.VL_NAME != "":
# If the service is already distributed, do not try to checkin/onboard (replay of tests)
# checkin is done if needed
# If service is replayed, no need to try to re-onboard the model
- # Double check because of: https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
+ # Double check because of:
+ # https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
if not service.distributed and service.status != onapsdk_const.DISTRIBUTED:
if service.status == onapsdk_const.DRAFT:
try:
# If the service is already distributed, do not try to checkin/onboard (replay of tests)
# checkin is done if needed
# If service is replayed, no need to try to re-onboard the model
- # Double check because of: https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
+ # Double check because of:
+ # https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
if not service.distributed and service.status != onapsdk_const.DISTRIBUTED:
if service.status == onapsdk_const.DRAFT:
try:
"""
super().execute()
vendor: Vendor = Vendor(name=settings.VENDOR_NAME)
- vsp: Vsp = Vsp(name=settings.VSP_NAME, vendor=vendor, package=open(settings.VSP_FILE_PATH, "rb"))
+ vsp: Vsp = Vsp(name=settings.VSP_NAME,
+ vendor=vendor,
+ package=open(settings.VSP_FILE_PATH, "rb"))
vsp.onboard()
@BaseStep.store_state(cleanup=True)
if "heat_files_to_upload" in pnf:
with open(get_resource_location(pnf["heat_files_to_upload"]), "rb") as package:
vsp: Vsp = Vsp(name=f"{pnf['pnf_name']}_VSP",
- vendor=vendor,
- package=package)
+ vendor=vendor,
+ package=package)
vsp.onboard()
@YamlTemplateBaseStep.store_state(cleanup=True)
FAIL = "FAIL"
NOT_EXECUTED = "NOT EXECUTED"
+
@dataclass
class Report:
"""Step execution report."""
details=details,
components=components,
log_path="./pythonsdk.debug.log").dump(
- str(Path(settings.REPORTING_FILE_DIRECTORY).joinpath(settings.HTML_REPORTING_FILE_NAME)))
+ str(Path(settings.REPORTING_FILE_DIRECTORY).joinpath(
+ settings.HTML_REPORTING_FILE_NAME)))
report_dict = {
'usecase': usecase,
for step_report in reversed(self.report)
]
}
- with (Path(settings.REPORTING_FILE_DIRECTORY).joinpath(settings.JSON_REPORTING_FILE_NAME)).open('w') as file:
+ with (Path(settings.REPORTING_FILE_DIRECTORY).joinpath(
+ settings.JSON_REPORTING_FILE_NAME)).open('w') as file:
json.dump(report_dict, file, indent=4)
response = requests.put(
"http://portal.api.simpledemo.onap.org:30726/mockserver/expectation",
json={
- "httpRequest" : {
+ "httpRequest": {
"method": expectation["method"],
"path": expectation["path"]
},
- "httpResponse" : {
+ "httpResponse": {
"body": expectation["response"]
}
}
config.load_incluster_config()
else:
config.load_kube_config(config_file=settings.K8S_CONFIG)
- k8s_client: "CoreV1API" = client.CoreV1Api()
- k8s_watch: "Watch" = watch.Watch()
+ k8s_client: "client.CoreV1Api" = client.CoreV1Api()
+ k8s_watch: "watch.Watch" = watch.Watch()
status = False
try:
for event in k8s_watch.stream(k8s_client.list_namespaced_pod,
- namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
- timeout_seconds=timeout_seconds):
+ namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
+ timeout_seconds=timeout_seconds):
if event["object"].metadata.name == "pnf-macro-test-simulator":
if not event["object"].status.phase in ["Pending", "Running"]:
# Invalid pod state
config.load_incluster_config()
else:
config.load_kube_config(config_file=settings.K8S_CONFIG)
- k8s_client: "CoreV1API" = client.CoreV1Api()
+ k8s_client: "client.CoreV1Api" = client.CoreV1Api()
try:
- for service in k8s_client.list_namespaced_service(namespace=settings.K8S_ONAP_NAMESPACE).items:
+ for service in k8s_client.list_namespaced_service(
+ namespace=settings.K8S_ONAP_NAMESPACE).items:
if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME:
proto = "http"
if "443" in str(service.spec.ports[0].port):
status = self.is_pnf_pod_running()
if not status:
raise EnvironmentPreparationException("PNF simulator is not running")
- time.sleep(settings.PNF_WAIT_TIME) # Let's still wait for PNF simulator to make sure it's initialized
+ # Let's still wait for PNF simulator to make sure it's initialized
+ time.sleep(settings.PNF_WAIT_TIME)
ves_proto, ves_ip, ves_port = self.get_ves_protocol_ip_and_port()
registration_number: int = 0
registered_successfully: bool = False
- while registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and not registered_successfully:
+ while (registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and
+ not registered_successfully):
try:
response = requests.post(
"http://portal.api.simpledemo.onap.org:30999/simulator/start",
"simulatorParams": {
"repeatCount": 9999,
"repeatInterval": 30,
- "vesServerUrl": f"{ves_proto}://sample1:sample1@{ves_ip}:{ves_port}/eventListener/v7"
+ "vesServerUrl": f"{ves_proto}://sample1:sample1@{ves_ip}:{ves_port}/eventListener/v7" # noqa
},
"templateName": "registration.json",
"patch": {
)
response.raise_for_status()
registered_successfully = True
- self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} source name")
+ self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} "
+ "source name")
except (requests.ConnectionError, requests.HTTPError) as http_error:
self._logger.debug(f"Can't connect with PNF simulator: {str(http_error)}")
registration_number = registration_number + 1
from avionix.errors import HelmError
from onaptests.steps.base import BaseStep
from onaptests.utils.resources import get_local_dir
-from onaptests.utils.exceptions import (
+from onaptests.utils.exceptions import (
EnvironmentPreparationException,
EnvironmentCleanupException)
-
class HelmChartStep(BaseStep):
"""Basic operations on a docker container."""
msg = f"{chart_info_file} not found."
raise EnvironmentPreparationException(msg) from err
-
try:
for dependency in chart_info["dependencies"]:
dep = ChartDependency(
dependencies.append(dep)
self.builder = ChartBuilder(
- chart_info=ChartInfo(
- api_version=chart_info["api_version"],
- name=chart_info["chart_name"],
- version=chart_info["version"], # SemVer 2 version
- app_version=chart_info["app_version"],
- dependencies=dependencies
- ),
- kubernetes_objects=[],
- keep_chart=False
- )
+ chart_info=ChartInfo(
+ api_version=chart_info["api_version"],
+ name=chart_info["chart_name"],
+ version=chart_info["version"], # SemVer 2 version
+ app_version=chart_info["app_version"],
+ dependencies=dependencies
+ ),
+ kubernetes_objects=[],
+ keep_chart=False
+ )
except KeyError as err:
msg = f"{chart_info_file} does not contain required keys."
raise EnvironmentPreparationException(msg) from err
msg = "Error during helm release installation."
raise EnvironmentPreparationException(msg) from err
-
def cleanup(self) -> None:
"""Uninstall helm release."""
try:
from onaptests.steps.base import BaseStep
from onaptests.utils.exceptions import TestConfigurationException
+
class SimulatorStartStep(BaseStep):
"""Basic operations on a docker container."""
__author__ = ("Morgan Richomme <morgan.richomme@orange.com>")
-class OnapTestException(Exception):
+
+class OnapTestException(Exception):
"""Parent Class for all Onap Test Exceptions."""
- error_message='Generic OnapTest exception'
+ error_message = 'Generic OnapTest exception'
+
class TestConfigurationException(OnapTestException):
"""Raise when configuration of the use case is incomplete or buggy."""
- error_message='Configuration error'
+ error_message = 'Configuration error'
+
class ServiceDistributionException(OnapTestException):
"""Service not properly distributed."""
- error_message='Service not well distributed'
+ error_message = 'Service not well distributed'
class ServiceInstantiateException(OnapTestException):
"""Service cannot be instantiated."""
- error_message='Service instantiation error'
+ error_message = 'Service instantiation error'
class ServiceCleanupException(OnapTestException):
"""Service cannot be cleaned."""
- error_message='Service not well cleaned up'
+ error_message = 'Service not well cleaned up'
class VnfInstantiateException(OnapTestException):
"""VNF cannot be instantiated."""
- error_message='VNF instantiation error'
+ error_message = 'VNF instantiation error'
class VnfCleanupException(OnapTestException):
"""VNF cannot be cleaned."""
- error_message="VNF can't be cleaned"
+ error_message = "VNF can't be cleaned"
class VfModuleInstantiateException(OnapTestException):
"""VF Module cannot be instantiated."""
- error_message='VF Module instantiation error'
+ error_message = 'VF Module instantiation error'
class VfModuleCleanupException(OnapTestException):
"""VF Module cannot be cleaned."""
- error_message="VF Module can't be cleaned"
+ error_message = "VF Module can't be cleaned"
class NetworkInstantiateException(OnapTestException):
"""Network cannot be instantiated."""
- error_message='Network instantiation error'
+ error_message = 'Network instantiation error'
class NetworkCleanupException(OnapTestException):
"""Network cannot be cleaned."""
- error_message="Network can't be cleaned"
+ error_message = "Network can't be cleaned"
+
class ProfileInformationException(OnapTestException):
"""Missing k8s profile information."""
- error_message='Missing k8s profile information'
+ error_message = 'Missing k8s profile information'
+
class ProfileCleanupException(OnapTestException):
"""K8s profile cannot be cleaned."""
- error_message="Profile can't be cleaned"
+ error_message = "Profile can't be cleaned"
+
class EnvironmentPreparationException(OnapTestException):
"""Test environment preparation exception."""
- error_message="Test can't be run properly due to preparation error"
+ error_message = "Test can't be run properly due to preparation error"
+
class SubstepExecutionException(OnapTestException):
"""Exception raised if substep execution fails."""
+
class EnvironmentCleanupException(OnapTestException):
"""Test environment cleanup exception."""
- error_message="Test couldn't finish a cleanup"
+ error_message = "Test couldn't finish a cleanup"
+
class PolicyException(OnapTestException):
"""Policy exception."""
- error_message="Problem with policy module"
+ error_message = "Problem with policy module"
+
class DcaeException(OnapTestException):
"""DCAE exception."""
- error_message="Problem with DCAE module"
+ error_message = "Problem with DCAE module"
+
class StatusCheckException(OnapTestException):
"""Status Check exception."""
- error_message="Namespace status check has failed"
+ error_message = "Namespace status check has failed"
[tox]
minversion = 3.2.0
-envlist = json,yaml,py,rst,md
+envlist = json,yaml,py,rst,md,pylama
skipsdist = true
requires = pip >= 8
commands =
/bin/bash -c "coala --non-interactive --disable-caching --no-autoapply-warn md --files $(</tmp/.coalist_md) \ "
+[testenv:pylama]
+deps = pylama
+skip_install = True
+commands = pylama src/