Merge "Add pylama into checks"
authorLukasz Rajewski <lukasz.rajewski@t-mobile.pl>
Tue, 18 Jul 2023 06:39:55 +0000 (06:39 +0000)
committerGerrit Code Review <gerrit@onap.org>
Tue, 18 Jul 2023 06:39:55 +0000 (06:39 +0000)
52 files changed:
src/onaptests/configuration/basic_clamp_settings.py
src/onaptests/configuration/basic_cnf_macro_settings.py
src/onaptests/configuration/basic_cnf_yaml_settings.py
src/onaptests/configuration/basic_cps_settings.py
src/onaptests/configuration/basic_network_nomulticloud_settings.py
src/onaptests/configuration/basic_onboard_settings.py
src/onaptests/configuration/basic_sdnc_settings.py
src/onaptests/configuration/basic_vm_macro_settings.py
src/onaptests/configuration/basic_vm_macro_stability_settings.py
src/onaptests/configuration/basic_vm_multicloud_yaml_settings.py
src/onaptests/configuration/basic_vm_settings.py
src/onaptests/configuration/cba_enrichment_settings.py
src/onaptests/configuration/cds_resource_resolution_settings.py
src/onaptests/configuration/clearwater_ims_nomulticloud_settings.py
src/onaptests/configuration/multi_vnf_ubuntu_settings.py
src/onaptests/configuration/pnf_macro_settings.py
src/onaptests/configuration/settings.py
src/onaptests/configuration/status_settings.py
src/onaptests/scenario/basic_clamp.py
src/onaptests/scenario/basic_cnf.py
src/onaptests/scenario/basic_network.py
src/onaptests/scenario/basic_vm.py
src/onaptests/scenario/cds_blueprint_enrichment.py
src/onaptests/scenario/cds_resource_resolution.py
src/onaptests/scenario/clearwater_ims.py
src/onaptests/scenario/multi_vnf_macro.py
src/onaptests/scenario/status.py
src/onaptests/steps/base.py
src/onaptests/steps/cloud/check_status.py
src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py
src/onaptests/steps/cloud/k8s_connectivity_info_create.py
src/onaptests/steps/cloud/register_cloud.py
src/onaptests/steps/cloud/resources.py
src/onaptests/steps/instantiate/k8s_profile_create.py
src/onaptests/steps/instantiate/service_ala_carte.py
src/onaptests/steps/instantiate/service_macro.py
src/onaptests/steps/instantiate/vf_module_ala_carte.py
src/onaptests/steps/instantiate/vl_ala_carte.py
src/onaptests/steps/instantiate/vnf_ala_carte.py
src/onaptests/steps/loop/clamp.py
src/onaptests/steps/loop/instantiate_loop.py
src/onaptests/steps/onboard/clamp.py
src/onaptests/steps/onboard/msb_k8s.py
src/onaptests/steps/onboard/service.py
src/onaptests/steps/onboard/vsp.py
src/onaptests/steps/reports_collection.py
src/onaptests/steps/simulator/cds_mockserver.py
src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py
src/onaptests/steps/wrapper/helm_charts.py
src/onaptests/steps/wrapper/start.py
src/onaptests/utils/exceptions.py
tox.ini

index 3eaad2d..ac99278 100644 (file)
@@ -1,7 +1,7 @@
 from yaml import load, SafeLoader
 from onaptests.utils.resources import get_resource_location
 
-from .settings import * # pylint: disable=W0614
+from .settings import *  # noqa
 
 """ Specific Basic clamp settings."""
 CLEANUP_FLAG = False
@@ -9,32 +9,32 @@ CLAMP_DISTRIBUTION_TIMER = 10
 
 # pylint: disable=bad-whitespace
 # The ONAP part
-SERVICE_DETAILS=("Onboarding, enriching a model with TCA." +
-                 "Design a loop with Clamp and deploy it in Policy and DCAE")
-SERVICE_COMPONENTS="SDC, CLAMP, POLICY, DCAE, DMAAP"
+SERVICE_DETAILS = ("Onboarding, enriching a model with TCA." +
+                   "Design a loop with Clamp and deploy it in Policy and DCAE")
+SERVICE_COMPONENTS = "SDC, CLAMP, POLICY, DCAE, DMAAP"
 
 VENDOR_NAME = "basiclamp_vendor"
 
 VSP_NAME = "basiclamp_vsp"
 
 OPERATIONAL_POLICIES = [
-  {
-    "name": "MinMax",
-    "policy_type": "onap.policies.controlloop.guard.common.MinMax",
-    "policy_version": "1.0.0",
-    "config_function": "add_minmax_config", #func
-    "configuration": {
-      "min": 1,
-      "max": 10
+    {
+        "name": "MinMax",
+        "policy_type": "onap.policies.controlloop.guard.common.MinMax",
+        "policy_version": "1.0.0",
+        "config_function": "add_minmax_config",  # func
+        "configuration": {
+            "min": 1,
+            "max": 10
+        }
+    },
+    {
+        "name": "FrequencyLimiter",
+        "policy_type": "onap.policies.controlloop.guard.common.FrequencyLimiter",
+        "policy_version": "1.0.0",
+        "config_function": "add_frequency_limiter",  # func
+        "configuration": {}
     }
-  },
-  {
-    "name": "FrequencyLimiter",
-    "policy_type": "onap.policies.controlloop.guard.common.FrequencyLimiter",
-    "policy_version": "1.0.0",
-    "config_function": "add_frequency_limiter", #func
-    "configuration": {}
-  }
 ]
 
 # if a yaml file is define, retrieve info from this yaml files
@@ -49,7 +49,7 @@ try:
         SERVICE_NAME = next(iter(yaml_config_file.keys()))
         VF_NAME = SERVICE_NAME
 except ValueError:
-    SERVICE_NAME = "" # Fill me
-    VF_NAME = "" # Fill me
+    SERVICE_NAME = ""  # Fill me
+    VF_NAME = ""  # Fill me
 
 MODEL_YAML_TEMPLATE = None
index c124f2b..56bbd81 100644 (file)
@@ -5,7 +5,7 @@ from yaml import load, SafeLoader
 
 from onaptests.utils.resources import get_resource_location
 import onaptests.utils.exceptions as onap_test_exceptions
-from .settings import *  # pylint: disable=W0614
+from .settings import *  # noqa
 
 """ Specific basic_cnf_macro with multicloud-k8s and yaml config scenario."""
 SERVICE_DETAILS = ("Onboarding, distribution and instantiation of a Apache CNF " +
@@ -63,7 +63,8 @@ TENANT_ID = '123456'
 TENANT_NAME = 'dummy_test'
 
 
-SERVICE_YAML_TEMPLATE = Path(get_resource_location("templates/vnf-services/basic_cnf_macro-service.yaml"))
+SERVICE_YAML_TEMPLATE = Path(get_resource_location(
+    "templates/vnf-services/basic_cnf_macro-service.yaml"))
 
 try:
     # Try to retrieve the SERVICE NAME from the yaml file
index 36b0c3f..4eb380c 100644 (file)
@@ -2,7 +2,7 @@ import os
 from yaml import load, SafeLoader
 from onaptests.utils.resources import get_resource_location
 import onaptests.utils.exceptions as onap_test_exceptions
-from .settings import * # pylint: disable=W0614
+from .settings import *  # noqa
 
 """ Specific basic_cnf with multicloud-k8s and yaml config scenario."""
 SERVICE_DETAILS = ("Onboarding, distribution and instantiation of a CNF" +
index bea9646..e359f65 100644 (file)
@@ -1,4 +1,4 @@
-from .settings import *
+from .settings import *  # noqa
 
 import json
 from pathlib import Path
@@ -7,36 +7,30 @@ from onaptests.utils.resources import get_resource_location
 
 CLEANUP_FLAG = True
 
-ANCHOR_DATA = json.dumps({
-    "bookstore": {
-      "bookstore-name": "Chapters",
-      "categories": [
-        {
-          "code": 1,
-          "name": "SciFi",
-          "books": [
-              {
-                "title": "2001: A Space Odyssey",
-                "price": 5
-              },
-              {
-                "title": "Dune",
-                "price": 5
-              }
-          ]
-        },
-        {
-          "code": 2,
-          "name": "Kids",
-          "books": [
-              {
-                "title": "Matilda"
-              }
-            ]
+ANCHOR_DATA = json.dumps(
+    {
+        "bookstore": {
+            "bookstore-name": "Chapters",
+            "categories": [{
+                "code": 1,
+                "name": "SciFi",
+                "books": [{
+                    "title": "2001: A Space Odyssey",
+                    "price": 5
+                }, {
+                    "title": "Dune",
+                    "price": 5
+                }]
+            }, {
+                "code": 2,
+                "name": "Kids",
+                "books": [{
+                    "title": "Matilda"
+                }]
+            }]
         }
-      ]
     }
-  })
+)
 ANCHOR_NAME = "basic-cps-test-anchor"
 DATASPACE_NAME = "basic-cps-test-dataspace"
 SCHEMA_SET_NAME = "basic-cps-test-schema-set"
index add6175..ed2cf41 100644 (file)
@@ -3,18 +3,18 @@ import openstack
 from yaml import load, SafeLoader
 from onaptests.utils.resources import get_resource_location
 import onaptests.utils.exceptions as onap_test_exceptions
-from .settings import * # pylint: disable=W0614
+from .settings import *  # noqa
 
 """ Specific Basic Network without multicloud."""
 
 # pylint: disable=bad-whitespace
 # The ONAP part
-SERVICE_DETAILS="Onboarding, distribution and instantiation of Basic Network using Ã  la carte"
-SERVICE_COMPONENTS="SDC, DMAAP, AAI, SO, SDNC"
+SERVICE_DETAILS = "Onboarding, distribution and instantiation of Basic Network using Ã  la carte"
+SERVICE_COMPONENTS = "SDC, DMAAP, AAI, SO, SDNC"
 USE_MULTICLOUD = False
 # Set ONLY_INSTANTIATE to true to run an instantiation without repeating
 # onboarding and related AAI configuration (Cloud config)
-ONLY_INSTANTIATE= False
+ONLY_INSTANTIATE = False
 
 # if a yaml file is define, retrieve info from this yaml files
 # if not declare the parameters in the settings
@@ -57,12 +57,12 @@ SERVICE_INSTANCE_NAME = "basicnw_service_instance"
 TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
 TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
 cloud = openstack.connect(cloud=TEST_CLOUD)
-VIM_USERNAME = cloud.config.auth.get('username','Fill me')
-VIM_PASSWORD = cloud.config.auth.get('password','Fill me')
-VIM_SERVICE_URL = cloud.config.auth.get('auth_url','Fill me')
-TENANT_ID = cloud.config.auth.get('project_id','Fill me')
-TENANT_NAME = cloud.config.auth.get('project_name','Fill me')
-CLOUD_REGION_ID = cloud.config.auth.get('region_name','RegionOne')
-CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name','Default')
+VIM_USERNAME = cloud.config.auth.get('username', 'Fill me')
+VIM_PASSWORD = cloud.config.auth.get('password', 'Fill me')
+VIM_SERVICE_URL = cloud.config.auth.get('auth_url', 'Fill me')
+TENANT_ID = cloud.config.auth.get('project_id', 'Fill me')
+TENANT_NAME = cloud.config.auth.get('project_name', 'Fill me')
+CLOUD_REGION_ID = cloud.config.auth.get('region_name', 'RegionOne')
+CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name', 'Default')
 
 MODEL_YAML_TEMPLATE = None
index d4aa3ec..c4a1a85 100644 (file)
@@ -6,7 +6,7 @@ from jinja2 import Environment, PackageLoader
 
 import onaptests.utils.exceptions as onap_test_exceptions
 from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import * # noqa
 
 """ Creation of service to onboard"""
 
@@ -33,17 +33,18 @@ def generate_service_config_yaml_file():
     with open(SERVICE_YAML_TEMPLATE, 'w+') as file_to_write:
         file_to_write.write(rendered_template)
 
+
 """Basic onboard service to only onboard a service in SDC"""
 
 # pylint: disable=bad-whitespace
 # The ONAP part
-SERVICE_DETAILS="Onboarding of an Ubuntu VM"
-SERVICE_COMPONENTS="SDC"
+SERVICE_DETAILS = "Onboarding of an Ubuntu VM"
+SERVICE_COMPONENTS = "SDC"
 
-#USE_MULTICLOUD = False
+# USE_MULTICLOUD = False
 # Set ONLY_INSTANTIATE to true to run an instantiation without repeating
 # onboarding and related AAI configuration (Cloud config)
-#ONLY_INSTANTIATE= False
+# ONLY_INSTANTIATE= False
 
 # if a yaml file is define, retrieve info from this yaml files
 # if not declare the parameters in the settings
@@ -59,8 +60,8 @@ try:
 except (FileNotFoundError, ValueError):
     raise onap_test_exceptions.TestConfigurationException
 
-#CLEANUP_FLAG = True
-#CLEANUP_ACTIVITY_TIMER = 10  # nb of seconds before cleanup in case cleanup option is set
+# CLEANUP_FLAG = True
+# CLEANUP_ACTIVITY_TIMER = 10  # nb of seconds before cleanup in case cleanup option is set
 VENDOR_NAME = "basic_onboard_vendor"
 
 MODEL_YAML_TEMPLATE = None
index 6cf2046..d364616 100644 (file)
@@ -1,4 +1,4 @@
-from .settings import *
+from .settings import *  # noqa
 
 CLEANUP_FLAG = True
 
index 522c66a..1aeea26 100644 (file)
@@ -6,7 +6,7 @@ from yaml import load, SafeLoader
 
 import onaptests.utils.exceptions as onap_test_exceptions
 from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import *  # noqa
 
 
 CLEANUP_FLAG = True
@@ -34,20 +34,21 @@ GLOBAL_CUSTOMER_ID = "basicvm-customer"
 
 TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
 cloud = openstack.connect(cloud=TEST_CLOUD)
-VIM_USERNAME = cloud.config.auth.get('username','Fill me')
-VIM_PASSWORD = cloud.config.auth.get('password','Fill me')
-VIM_SERVICE_URL = cloud.config.auth.get('auth_url','Fill me')
-TENANT_ID = cloud.config.auth.get('project_id','Fill me')
-TENANT_NAME = cloud.config.auth.get('project_name','Fill me')
-CLOUD_REGION_ID = cloud.config.auth.get('region_name','RegionOne')
-CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name','Default')
+VIM_USERNAME = cloud.config.auth.get('username', 'Fill me')
+VIM_PASSWORD = cloud.config.auth.get('password', 'Fill me')
+VIM_SERVICE_URL = cloud.config.auth.get('auth_url', 'Fill me')
+TENANT_ID = cloud.config.auth.get('project_id', 'Fill me')
+TENANT_NAME = cloud.config.auth.get('project_name', 'Fill me')
+CLOUD_REGION_ID = cloud.config.auth.get('region_name', 'RegionOne')
+CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name', 'Default')
 
 OWNING_ENTITY = "basicvm-oe"
 PROJECT = "basicvm-project"
 LINE_OF_BUSINESS = "basicvm-lob"
 PLATFORM = "basicvm-platform"
 CLOUD_DOMAIN = "Default"
-SERVICE_YAML_TEMPLATE = Path(get_resource_location("templates/vnf-services/basic_vm_macro-service.yaml"))
+SERVICE_YAML_TEMPLATE = Path(get_resource_location(
+    "templates/vnf-services/basic_vm_macro-service.yaml"))
 
 try:
     # Try to retrieve the SERVICE NAME from the yaml file
index 67a423c..c128ae4 100644 (file)
@@ -1,4 +1,10 @@
-from .basic_vm_macro_settings import *   # pylint: disable=W0614
+from .basic_vm_macro_settings import *  # noqa
 
-SERVICE_YAML_TEMPLATE = Path(get_resource_location("templates/vnf-services/basic_vm_macro_stability-service.yaml"))
+from pathlib import Path
+
+from onaptests.utils.resources import get_resource_location
+
+
+SERVICE_YAML_TEMPLATE = Path(get_resource_location(
+    "templates/vnf-services/basic_vm_macro_stability-service.yaml"))
 MODEL_YAML_TEMPLATE = None
index cb16d81..838a5ec 100644 (file)
@@ -1,20 +1,20 @@
 from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import *  # noqa
 
 """ Specific Basic VM with multicloud and yaml config scenario."""
 SERVICE_DETAILS = ("Onboarding, distribution and instantiation of a VM" +
                    "using Ã  la carte and Multicloud module")
-SERVICE_COMPONENTS="SDC, DMAAP, AAI, SO, SDNC, Multicloud"
+SERVICE_COMPONENTS = "SDC, DMAAP, AAI, SO, SDNC, Multicloud"
 
 USE_MULTICLOUD = True
 # Set ONLY_INSTANTIATE to true to run an instantiation without repeating
 # onboarding and related AAI configuration (Cloud config)
-ONLY_INSTANTIATE= False
+ONLY_INSTANTIATE = False
 VENDOR_NAME = "sdktests_vendor"
-SERVICE_NAME = "basicvmtest" # must be the same as in YAML
+SERVICE_NAME = "basicvmtest"  # must be the same as in YAML
 
-CLOUD_REGION_CLOUD_OWNER = "sdktestsOwner" # must not contain _
-CLOUD_REGION_ID = "RegionOne" # should be valid, as otherwise MultiCloud fails
+CLOUD_REGION_CLOUD_OWNER = "sdktestsOwner"  # must not contain _
+CLOUD_REGION_ID = "RegionOne"  # should be valid, as otherwise MultiCloud fails
 CLOUD_REGION_TYPE = "openstack"
 CLOUD_OWNER_DEFINED_TYPE = "N/A"
 CLOUD_REGION_VERSION = "titanium_cloud"
@@ -24,9 +24,9 @@ COMPLEX_PHYSICAL_LOCATION_ID = "sdktests_complex_physical_location_id"
 COMPLEX_DATA_CENTER_CODE = "sdktests_complex_data_center_code"
 
 GLOBAL_CUSTOMER_ID = "sdktests_global_customer_id"
-TENANT_ID = ""  # Fill me in your custom settings
-TENANT_NAME= "" # Fill me in your custom settings
-AVAILABILITY_ZONE_NAME = "" # Fill me in your custom settings
+TENANT_ID = ""   # Fill me in your custom settings
+TENANT_NAME = ""  # Fill me in your custom settings
+AVAILABILITY_ZONE_NAME = ""  # Fill me in your custom settings
 AVAILABILITY_ZONE_TYPE = "nova"
 
 VIM_USERNAME = ""  # Fill me in your custom settings
index 0b33ec1..8066625 100644 (file)
@@ -3,19 +3,19 @@ import openstack
 from yaml import load, SafeLoader
 from onaptests.utils.resources import get_resource_location
 import onaptests.utils.exceptions as onap_test_exceptions
-from .settings import * # pylint: disable=W0614
+from .settings import *  # noqa
 
 """ Specific basic_vm without multicloud."""
 
 # pylint: disable=bad-whitespace
 # The ONAP part
-SERVICE_DETAILS="Onboarding, distribution and instanitation of an Ubuntu VM using Ã  la carte"
-SERVICE_COMPONENTS="SDC, DMAAP, AAI, SO, SDNC"
+SERVICE_DETAILS = "Onboarding, distribution and instanitation of an Ubuntu VM using Ã  la carte"
+SERVICE_COMPONENTS = "SDC, DMAAP, AAI, SO, SDNC"
 
 USE_MULTICLOUD = False
 # Set ONLY_INSTANTIATE to true to run an instantiation without repeating
 # onboarding and related AAI configuration (Cloud config)
-ONLY_INSTANTIATE= False
+ONLY_INSTANTIATE = False
 
 # if a yaml file is define, retrieve info from this yaml files
 # if not declare the parameters in the settings
@@ -61,12 +61,12 @@ SERVICE_INSTANCE_NAME = "basic_vm_service_instance"
 TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
 TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
 cloud = openstack.connect(cloud=TEST_CLOUD)
-VIM_USERNAME = cloud.config.auth.get('username','Fill me')
-VIM_PASSWORD = cloud.config.auth.get('password','Fill me')
-VIM_SERVICE_URL = cloud.config.auth.get('auth_url','Fill me')
-TENANT_ID = cloud.config.auth.get('project_id','Fill me')
-TENANT_NAME = cloud.config.auth.get('project_name','Fill me')
-CLOUD_REGION_ID = cloud.config.auth.get('region_name','RegionOne')
-CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name','Default')
+VIM_USERNAME = cloud.config.auth.get('username', 'Fill me')
+VIM_PASSWORD = cloud.config.auth.get('password', 'Fill me')
+VIM_SERVICE_URL = cloud.config.auth.get('auth_url', 'Fill me')
+TENANT_ID = cloud.config.auth.get('project_id', 'Fill me')
+TENANT_NAME = cloud.config.auth.get('project_name', 'Fill me')
+CLOUD_REGION_ID = cloud.config.auth.get('region_name', 'RegionOne')
+CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name', 'Default')
 
 MODEL_YAML_TEMPLATE = None
index 2d05cd2..847e15e 100644 (file)
@@ -1,7 +1,7 @@
 from pathlib import Path
 
 from onaptests.utils.resources import get_resource_location
-from .settings import *  # pylint: disable=W0614
+from .settings import *  # noqa
 
 SERVICE_NAME = "CDS blueprint enrichment"
 
index c9626dd..6530a4c 100644 (file)
@@ -2,19 +2,20 @@ from pathlib import Path
 from uuid import uuid4
 
 from onaptests.utils.resources import get_resource_location
-from .settings import *  # pylint: disable=W0614
+from .settings import *  # noqa
 
 CLEANUP_FLAG = True
 SERVICE_NAME = "CDS resource resolution"
-CLOUD_REGION_CLOUD_OWNER = "basicnf-owner" # must not contain _
+CLOUD_REGION_CLOUD_OWNER = "basicnf-owner"  # must not contain _
 CLOUD_REGION_ID = "k8sregion-cds"
 CLOUD_REGION_TYPE = "k8s"
 CLOUD_REGION_VERSION = "1.0"
 CLOUD_OWNER_DEFINED_TYPE = "N/A"
 COMPLEX_PHYSICAL_LOCATION_ID = "sdktests"
 
-MSB_K8S_DEFINITION_ATRIFACT_FILE_PATH = Path(get_resource_location(
-                                             "templates/artifacts/cds-resource-resolution/cds-mock-server.tar.gz"))
+MSB_K8S_DEFINITION_ATRIFACT_FILE_PATH = Path(
+    get_resource_location(
+        "templates/artifacts/cds-resource-resolution/cds-mock-server.tar.gz"))
 MSB_K8S_RB_NAME = f"cds-ms-rb-{str(uuid4())}"
 MSB_K8S_RB_VERSION = "v1"
 MSB_K8S_PROFILE_ARTIFACT_FILE_PATH = Path(get_resource_location(
@@ -56,8 +57,10 @@ CDS_MOCKSERVER_EXPECTATIONS = [
     }
 ]
 
-CDS_DD_FILE = Path(get_resource_location("templates/artifacts/cds-resource-resolution/dd.json"))
-CDS_CBA_UNENRICHED = Path(get_resource_location("templates/artifacts/cds-resource-resolution/resource-resolution.zip"))
+CDS_DD_FILE = Path(get_resource_location(
+    "templates/artifacts/cds-resource-resolution/dd.json"))
+CDS_CBA_UNENRICHED = Path(get_resource_location(
+    "templates/artifacts/cds-resource-resolution/resource-resolution.zip"))
 CDS_CBA_ENRICHED = Path("/tmp/resource-resolution-enriched.zip")
 CDS_WORKFLOW_NAME = "resource-resolution"
 CDS_WORKFLOW_INPUT = {
@@ -71,11 +74,11 @@ CDS_WORKFLOW_INPUT = {
         "j_input": "ok"
     }
 }
-CDS_WORKFLOW_EXPECTED_OUTPUT  = {
+CDS_WORKFLOW_EXPECTED_OUTPUT = {
     "resource-resolution-response": {
         "meshed-template": {
-            "helloworld-velocity": "{\n  \"default\": \"ok\",\n  \"input\": \"ok\",\n  \"script\": {\n    \"python\": \"ok\",\n    \"kotlin\": \"ok\"\n  },\n  \"db\": \"ok\",\n  \"rest\": {\n    \"GET\": \"A046E51D-44DC-43AE-BBA2-86FCA86C5265\",\n    \"POST\": \"post:ok\",\n    \"PUT\": \"put:ok\",\n    \"PATCH\": \"patch:ok\",\n    \"DELETE\": \"delete:ok\"\n  }\n}\n",
-            "helloworld-jinja": "{\n  \"default\": \"ok\",\n  \"input\": \"ok\",\n  \"script\": {\n    \"python\": \"ok\",\n    \"kotlin\": {\n      \"base\": \"ok\"\n      \"from suspend function\": \"ok\"\n    }\n  },\n  \"db\": \"ok\",\n  \"rest\": {\n    \"GET\": \"A046E51D-44DC-43AE-BBA2-86FCA86C5265\",\n    \"GET_ID\": \"74FE67C6-50F5-4557-B717-030D79903908\",\n    \"POST\": \"post:ok\",\n    \"PUT\": \"put:ok\",\n    \"PATCH\": \"patch:ok\",\n    \"DELETE\": \"delete:ok\"\n  }\n}\n"
+            "helloworld-velocity": "{\n  \"default\": \"ok\",\n  \"input\": \"ok\",\n  \"script\": {\n    \"python\": \"ok\",\n    \"kotlin\": \"ok\"\n  },\n  \"db\": \"ok\",\n  \"rest\": {\n    \"GET\": \"A046E51D-44DC-43AE-BBA2-86FCA86C5265\",\n    \"POST\": \"post:ok\",\n    \"PUT\": \"put:ok\",\n    \"PATCH\": \"patch:ok\",\n    \"DELETE\": \"delete:ok\"\n  }\n}\n",  # noqa
+            "helloworld-jinja": "{\n  \"default\": \"ok\",\n  \"input\": \"ok\",\n  \"script\": {\n    \"python\": \"ok\",\n    \"kotlin\": {\n      \"base\": \"ok\"\n      \"from suspend function\": \"ok\"\n    }\n  },\n  \"db\": \"ok\",\n  \"rest\": {\n    \"GET\": \"A046E51D-44DC-43AE-BBA2-86FCA86C5265\",\n    \"GET_ID\": \"74FE67C6-50F5-4557-B717-030D79903908\",\n    \"POST\": \"post:ok\",\n    \"PUT\": \"put:ok\",\n    \"PATCH\": \"patch:ok\",\n    \"DELETE\": \"delete:ok\"\n  }\n}\n"  # noqa
         }
     }
 }
index 5803df3..0965e9d 100644 (file)
@@ -2,7 +2,7 @@ import os
 import openstack
 from yaml import load, SafeLoader
 from onaptests.utils.resources import get_resource_location
-from .settings import * # pylint: disable=W0614
+from .settings import *  # noqa
 
 """ Specific clearwater IMS without multicloud."""
 
@@ -11,7 +11,7 @@ from .settings import * # pylint: disable=W0614
 USE_MULTICLOUD = False
 # Set ONLY_INSTANTIATE to true to run an instantiation without repeating
 # onboarding and related AAI configuration (Cloud config)
-ONLY_INSTANTIATE= False
+ONLY_INSTANTIATE = False
 CLEANUP_FLAG = True
 CLEANUP_ACTIVITY_TIMER = 60  # nb of seconds before cleanup in case cleanup option is set
 VENDOR_NAME = "clearwater-ims_vendor"
@@ -28,7 +28,7 @@ try:
         yaml_config_file = load(yaml_template, SafeLoader)
         SERVICE_NAME = next(iter(yaml_config_file.keys()))
 except ValueError:
-    SERVICE_NAME = "" # Fill me
+    SERVICE_NAME = ""  # Fill me
 
 CLOUD_REGION_CLOUD_OWNER = "clearwater-ims-cloud-owner"
 CLOUD_REGION_TYPE = "openstack"
@@ -53,12 +53,12 @@ SERVICE_INSTANCE_NAME = "clearwater-ims_service_instance"
 # to retrieve cloud info and avoid data duplication
 TEST_CLOUD = os.getenv('OS_TEST_CLOUD')
 cloud = openstack.connect(cloud=TEST_CLOUD)
-VIM_USERNAME = cloud.config.auth.get('username','Fill me')
-VIM_PASSWORD = cloud.config.auth.get('password','Fill me')
-VIM_SERVICE_URL = cloud.config.auth.get('auth_url','Fill me')
-TENANT_ID = cloud.config.auth.get('project_id','Fill me')
-TENANT_NAME = cloud.config.auth.get('project_name','Fill me')
-CLOUD_REGION_ID = cloud.config.get('region_name','RegionOne')
-CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name','Default')
+VIM_USERNAME = cloud.config.auth.get('username', 'Fill me')
+VIM_PASSWORD = cloud.config.auth.get('password', 'Fill me')
+VIM_SERVICE_URL = cloud.config.auth.get('auth_url', 'Fill me')
+TENANT_ID = cloud.config.auth.get('project_id', 'Fill me')
+TENANT_NAME = cloud.config.auth.get('project_name', 'Fill me')
+CLOUD_REGION_ID = cloud.config.get('region_name', 'RegionOne')
+CLOUD_DOMAIN = cloud.config.auth.get('project_domain_name', 'Default')
 
 MODEL_YAML_TEMPLATE = None
index bb7b8b5..1b9d4a2 100644 (file)
@@ -4,7 +4,7 @@ from pathlib import Path
 import openstack
 from jinja2 import Environment, PackageLoader
 from onaptests.utils.resources import get_resource_location
-from .settings import *  # pylint: disable=W0614
+from .settings import *  # noqa
 
 VNF_FILENAME_PREFIX = "multi-vnf-ubuntu"
 SERVICE_NAME = f"multivnfubuntu{str(uuid.uuid4().hex)[:6]}"
index a315525..0329962 100644 (file)
@@ -2,7 +2,7 @@ from pathlib import Path
 from uuid import uuid4
 
 from onaptests.utils.resources import get_resource_location
-from .settings import *  # pylint: disable=W0614
+from .settings import *  # noqa
 
 ONLY_INSTANTIATE = False
 CLEANUP_FLAG = True
@@ -17,7 +17,7 @@ CDS_DD_FILE = Path(get_resource_location("templates/artifacts/dd.json"))
 CDS_CBA_UNENRICHED = Path(get_resource_location("templates/artifacts/PNF_DEMO.zip"))
 CDS_CBA_ENRICHED = "/tmp/PNF_DEMO_enriched.zip"
 
-CLOUD_REGION_CLOUD_OWNER = "basicnf-owner" # must not contain _
+CLOUD_REGION_CLOUD_OWNER = "basicnf-owner"  # must not contain _
 CLOUD_REGION_ID = "k8sregion-pnf-macro"
 CLOUD_REGION_TYPE = "k8s"
 CLOUD_REGION_VERSION = "1.0"
@@ -31,10 +31,12 @@ PLATFORM = "pnf_macro_platform"
 
 INSTANTIATION_TIMEOUT = 600
 
-MSB_K8S_DEFINITION_ATRIFACT_FILE_PATH = Path(get_resource_location("templates/artifacts/pnf-simulator.tar.gz"))
+MSB_K8S_DEFINITION_ATRIFACT_FILE_PATH = Path(get_resource_location(
+    "templates/artifacts/pnf-simulator.tar.gz"))
 MSB_K8S_RB_NAME = f"pnf-cnf-rb-{str(uuid4())}"
 MSB_K8S_RB_VERSION = "v1"
-MSB_K8S_PROFILE_ARTIFACT_FILE_PATH = Path(get_resource_location("templates/artifacts/profile.tar.gz"))
+MSB_K8S_PROFILE_ARTIFACT_FILE_PATH = Path(get_resource_location(
+    "templates/artifacts/profile.tar.gz"))
 MSB_K8S_PROFILE_NAME = f"pnf-cnf-profile-{str(uuid4())}"
 K8S_VERSION = "1.0"
 K8S_CONFIG = get_resource_location("templates/artifacts/config")
index 83ae792..e27c382 100644 (file)
@@ -48,10 +48,10 @@ TILLER_HOST = "localhost"
 K8S_CONFIG = None  # None means it will use default config (~/.kube/config)
 K8S_ONAP_NAMESPACE = "onap"  # ONAP Kubernetes namespace
 K8S_ADDITIONAL_RESOURCES_NAMESPACE = K8S_ONAP_NAMESPACE  # Resources created on tests namespace
-#SOCK_HTTP = "socks5h://127.0.0.1:8091"
+# SOCK_HTTP = "socks5h://127.0.0.1:8091"
 
 ORCHESTRATION_REQUEST_TIMEOUT = 60.0 * 15  # 15 minutes in seconds
 SERVICE_DISTRIBUTION_NUMBER_OF_TRIES = 30
 SERVICE_DISTRIBUTION_SLEEP_TIME = 60
 EXPOSE_SERVICES_NODE_PORTS = True
-IN_CLUSTER = False
\ No newline at end of file
+IN_CLUSTER = False
index 57ee1b3..81d34c6 100644 (file)
@@ -1,4 +1,4 @@
-from .settings import * # pylint: disable=W0614
+from .settings import *  # noqa
 
 """ Specific Status Check """
 SERVICE_NAME = "Status Check"
@@ -24,7 +24,6 @@ EXCLUDED_LABELS = {
 }
 
 SPECIFIC_LOGS_CONTAINERS = {
-    'sdc-be': ['/var/log/onap/sdc/sdc-be/error.log'],
     'sdc-onboarding-be': ['/var/log/onap/sdc/sdc-onboarding-be/error.log'],
     'aaf-cm': [
         '/opt/app/osaaf/logs/cm/cm-service.log',
@@ -97,4 +96,4 @@ GENERIC_NAMES = {
 
 MAX_LOG_BYTES = 512000
 
-UNLIMITED_LOG_BYTES = 10**10 # 10 GB
+UNLIMITED_LOG_BYTES = 10**10  # 10 GB
index c539c11..2e24cac 100644 (file)
@@ -12,11 +12,12 @@ from onaptests.utils.exceptions import OnapTestException
 class BasicClamp(ScenarioBase):
     """Onboard, update a model with a loop, design the loop and deploy it."""
     __logger = logging.getLogger(__name__)
+
     def __init__(self, **kwargs):
         """Init Basic Clamp, onboard a VM, design and deploy a loop with CLAMP."""
         super().__init__('basic_clamp', **kwargs)
         self.test = ClampStep(
-                cleanup=settings.CLEANUP_FLAG)
+            cleanup=settings.CLEANUP_FLAG)
         self.start_time = None
         self.stop_time = None
         self.result = 0
index 06f6f15..51a5cc5 100644 (file)
@@ -20,7 +20,7 @@ class BasicCnf(ScenarioBase):
         """Init BasicCnf."""
         super().__init__('basic_cnf', **kwargs)
         self.test = YamlTemplateVfModuleAlaCarteInstantiateStep(
-                cleanup=settings.CLEANUP_FLAG)
+            cleanup=settings.CLEANUP_FLAG)
         self.start_time = None
         self.stop_time = None
         self.result = 0
index 3970700..32d2095 100644 (file)
@@ -21,7 +21,7 @@ class BasicNetwork(ScenarioBase):
         # import basic_network_nomulticloud_settings needed
         super().__init__('basic_network', **kwargs)
         self.test = YamlTemplateVlAlaCarteInstantiateStep(
-                cleanup=settings.CLEANUP_FLAG)
+            cleanup=settings.CLEANUP_FLAG)
         self.start_time = None
         self.stop_time = None
         self.result = 0
index 81c2d09..cbf57db 100644 (file)
@@ -20,7 +20,7 @@ class BasicVm(ScenarioBase):
         """Init BasicVM."""
         super().__init__('basic_vm', **kwargs)
         self.test = YamlTemplateVfModuleAlaCarteInstantiateStep(
-                cleanup=settings.CLEANUP_FLAG)
+            cleanup=settings.CLEANUP_FLAG)
         self.start_time = None
         self.stop_time = None
         self.result = 0
index 5a63695..a16169b 100644 (file)
@@ -20,7 +20,7 @@ class CDSBlueprintEnrichment(ScenarioBase):
         """Init CDS blueprint enrichment use case."""
         super().__init__('basic_cds', **kwargs)
         self.test = CbaEnrichStep(
-                cleanup=settings.CLEANUP_FLAG)
+            cleanup=settings.CLEANUP_FLAG)
         self.start_time = None
         self.stop_time = None
         self.result = 0
index 90e3dc5..675b135 100644 (file)
@@ -68,7 +68,7 @@ class CDSResourceResolution(ScenarioBase):
         """Init CDS resource resolution use case."""
         super().__init__('basic_cds', **kwargs)
         self.test = CDSResourceResolutionStep(
-                cleanup=settings.CLEANUP_FLAG)
+            cleanup=settings.CLEANUP_FLAG)
         self.start_time = None
         self.stop_time = None
         self.result = 0
index db0a01f..ed16bc4 100644 (file)
@@ -21,7 +21,7 @@ class ClearwaterIms(ScenarioBase):
         # import clearwater_ims_nomulticloud_settings needed
         super().__init__('clearwater_ims', **kwargs)
         self.test = YamlTemplateVfModuleAlaCarteInstantiateStep(
-                cleanup=settings.CLEANUP_FLAG)
+            cleanup=settings.CLEANUP_FLAG)
         self.start_time = None
         self.stop_time = None
         self.result = 0
@@ -56,5 +56,5 @@ class ClearwaterIms(ScenarioBase):
         """Clean Additional resources if needed."""
         try:
             self.test.reports_collection.generate_report()
-        except:
+        except:  # noqa
             self.__logger.error("Impossible to generate reporting")
index 4a8c6cc..82a0511 100644 (file)
@@ -117,7 +117,8 @@ class MultiVnfUbuntuMacro(ScenarioBase):
         self.start_time = time.time()
         try:
             self.test.execute()
-            self.__logger.info("Starting to clean up in {} seconds".format(settings.CLEANUP_ACTIVITY_TIMER))
+            self.__logger.info("Starting to clean up in {} seconds".format(
+                settings.CLEANUP_ACTIVITY_TIMER))
             time.sleep(settings.CLEANUP_ACTIVITY_TIMER)
             self.test.cleanup()
             self.result = 100
index ed27975..881a840 100644 (file)
@@ -17,7 +17,7 @@ class Status(ScenarioBase):
         """Init the testcase."""
         super().__init__('status', **kwargs)
         self.test = CheckNamespaceStatusStep(
-                cleanup=settings.CLEANUP_FLAG)
+            cleanup=settings.CLEANUP_FLAG)
         self.start_time = None
         self.stop_time = None
         self.result = 0
index 8ebe247..2c5fb29 100644 (file)
@@ -163,13 +163,15 @@ class BaseStep(ABC):
     def store_state(cls, fun=None, *, cleanup=False):
         if fun is None:
             return functools.partial(cls.store_state, cleanup=cleanup)
+
         @functools.wraps(fun)
         def wrapper(self, *args, **kwargs):
             try:
                 if cleanup:
                     self._start_cleanup_time = time.time()
                     self._logger.info("*****************************************************")
-                    self._logger.info(f"START [{self.component}] {self.name} cleanup: {self.description}")
+                    self._logger.info(f"START [{self.component}] {self.name} cleanup: "
+                                      f"{self.description}")
                     self._logger.info("*****************************************************")
                 else:
                     self._logger.info("*****************************************************")
@@ -180,7 +182,8 @@ class BaseStep(ABC):
                 execution_status = ReportStepStatus.PASS
                 return ret
             except SubstepExecutionException:
-                execution_status = ReportStepStatus.PASS if cleanup else ReportStepStatus.NOT_EXECUTED
+                execution_status = (ReportStepStatus.PASS if cleanup else
+                                    ReportStepStatus.NOT_EXECUTED)
                 raise
             except (OnapTestException, SDKException):
                 execution_status = ReportStepStatus.FAIL
@@ -188,10 +191,12 @@ class BaseStep(ABC):
             finally:
                 if cleanup:
                     self._logger.info("*****************************************************")
-                    self._logger.info(f"STOP [{self.component}] {self.name} cleanup: {self.description}")
+                    self._logger.info(f"STOP [{self.component}] {self.name} cleanup: "
+                                      f"{self.description}")
                     self._logger.info("*****************************************************")
                     self._cleanup_report = Report(
-                        step_description=f"[{self.component}] {self.name} cleanup: {self.description}",
+                        step_description=(f"[{self.component}] {self.name} cleanup: "
+                                          f"{self.description}"),
                         step_execution_status=execution_status,
                         step_execution_duration=time.time() - self._start_cleanup_time,
                         step_component=self.component
@@ -202,12 +207,15 @@ class BaseStep(ABC):
                     self._logger.info("*****************************************************")
                     if not self._start_execution_time:
                         if execution_status != ReportStepStatus.NOT_EXECUTED:
-                            self._logger.error("No execution start time saved for %s step. Fix it by call `super.execute()` "
-                                               "in step class `execute()` method definition", self.name)
+                            self._logger.error("No execution start time saved for %s step. "
+                                               "Fix it by call `super.execute()` "
+                                               "in step class `execute()` method definition",
+                                               self.name)
                         self._start_execution_time = time.time()
                     self._execution_report = Report(
                         step_description=f"[{self.component}] {self.name}: {self.description}",
-                        step_execution_status=execution_status if execution_status else ReportStepStatus.FAIL,
+                        step_execution_status=(execution_status if execution_status else
+                                               ReportStepStatus.FAIL),
                         step_execution_duration=time.time() - self._start_execution_time,
                         step_component=self.component
                     )
index 1fcb96b..2b5a8df 100644 (file)
@@ -48,7 +48,7 @@ class CheckK8sResourcesStep(BaseStep):
         self.all_resources = []
         self.failing_resources = []
         self.jinja_env = Environment(autoescape=select_autoescape(['html']),
-                                loader=PackageLoader('onaptests.templates','status'))
+                                     loader=PackageLoader('onaptests.templates', 'status'))
 
     @property
     def component(self) -> str:
@@ -69,7 +69,7 @@ class CheckK8sResourcesStep(BaseStep):
 
     def _add_failing_resource(self, resource):
         if (resource.labels and settings.EXCLUDED_LABELS
-            and (resource.labels.keys() and settings.EXCLUDED_LABELS.keys())):
+                and (resource.labels.keys() and settings.EXCLUDED_LABELS.keys())):
             for label in resource.labels.items():
                 for waived_label in settings.EXCLUDED_LABELS.items():
                     if label[0] in waived_label[0] and label[1] in waived_label[1]:
@@ -84,19 +84,24 @@ class CheckK8sResourcesStep(BaseStep):
         try:
             self._init_resources()
             if len(self.k8s_resources) > 0:
-                self.__logger.info("%4s %ss in the namespace", len(self.k8s_resources), self.resource_type)
+                self.__logger.info("%4s %ss in the namespace",
+                                   len(self.k8s_resources),
+                                   self.resource_type)
                 self._parse_resources()
-                self.__logger.info("%4s %ss parsed, %s failing", len(self.all_resources), self.resource_type,
-                    len(self.failing_resources))
+                self.__logger.info("%4s %ss parsed, %s failing",
+                                   len(self.all_resources),
+                                   self.resource_type,
+                                   len(self.failing_resources))
         except (ConnectionRefusedError, MaxRetryError, NewConnectionError):
             self.__logger.error("Test of k8s %ss failed.", self.resource_type)
             self.__logger.error("Cannot connect to Kubernetes.")
 
+
 class CheckBasicK8sResourcesStep(CheckK8sResourcesStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, resource_type: str, k8s_res_class, cleanup: bool = False,**kwargs):
+    def __init__(self, resource_type: str, k8s_res_class, cleanup: bool = False, **kwargs):
         """Init CheckBasicK8sResourcesStep."""
         super().__init__(resource_type)
         self.k8s_res_class = k8s_res_class
@@ -112,11 +117,12 @@ class CheckBasicK8sResourcesStep(CheckK8sResourcesStep):
     def execute(self):
         super().execute()
 
+
 class CheckK8sConfigMapsStep(CheckBasicK8sResourcesStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, cleanup: bool = False,**kwargs):
+    def __init__(self, cleanup: bool = False, **kwargs):
         """Init CheckK8sConfigMapsStep."""
         super().__init__("configmap", ConfigMap)
 
@@ -124,11 +130,12 @@ class CheckK8sConfigMapsStep(CheckBasicK8sResourcesStep):
         super()._init_resources()
         self.k8s_resources = self.core.list_namespaced_config_map(NAMESPACE).items
 
+
 class CheckK8sSecretsStep(CheckBasicK8sResourcesStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, cleanup: bool = False,**kwargs):
+    def __init__(self, cleanup: bool = False, **kwargs):
         """Init CheckK8sSecretsStep."""
         super().__init__("secret", Secret)
 
@@ -136,11 +143,12 @@ class CheckK8sSecretsStep(CheckBasicK8sResourcesStep):
         super()._init_resources()
         self.k8s_resources = self.core.list_namespaced_secret(NAMESPACE).items
 
+
 class CheckK8sIngressesStep(CheckBasicK8sResourcesStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, cleanup: bool = False,**kwargs):
+    def __init__(self, cleanup: bool = False, **kwargs):
         """Init CheckK8sIngressesStep."""
         super().__init__("ingress", Ingress)
 
@@ -148,11 +156,12 @@ class CheckK8sIngressesStep(CheckBasicK8sResourcesStep):
         super()._init_resources()
         self.k8s_resources = self.networking.list_namespaced_ingress(NAMESPACE).items
 
+
 class CheckK8sPvcsStep(CheckK8sResourcesStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, cleanup: bool = False,**kwargs):
+    def __init__(self, cleanup: bool = False, **kwargs):
         """Init CheckK8sPvcsStep."""
         super().__init__("pvc")
 
@@ -165,10 +174,10 @@ class CheckK8sPvcsStep(CheckK8sResourcesStep):
         Return a list of Pods that were created to perform jobs.
         """
         super()._parse_resources()
-        jobs_pods = []
         for k8s in self.k8s_resources:
             pvc = Pvc(k8s=k8s)
-            field_selector = f"involvedObject.name={pvc.name},involvedObject.kind=PersistentVolumeClaim"
+            field_selector = (f"involvedObject.name={pvc.name}, "
+                              "involvedObject.kind=PersistentVolumeClaim")
             pvc.events = self.core.list_namespaced_event(
                 NAMESPACE,
                 field_selector=field_selector).items
@@ -181,11 +190,12 @@ class CheckK8sPvcsStep(CheckK8sResourcesStep):
     def execute(self):
         super().execute()
 
+
 class CheckK8sResourcesUsingPodsStep(CheckK8sResourcesStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, resource_type: str, pods_source, cleanup: bool = False,**kwargs):
+    def __init__(self, resource_type: str, pods_source, cleanup: bool = False, **kwargs):
         """Init CheckK8sResourcesUsingPodsStep."""
         super().__init__(resource_type)
         self.pods_source = pods_source
@@ -219,11 +229,12 @@ class CheckK8sResourcesUsingPodsStep(CheckK8sResourcesStep):
     def execute(self):
         super().execute()
 
+
 class CheckK8sJobsStep(CheckK8sResourcesUsingPodsStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, cleanup: bool = False,**kwargs):
+    def __init__(self, cleanup: bool = False, **kwargs):
         """Init CheckK8sJobsStep."""
         super().__init__("job", None)
 
@@ -256,19 +267,19 @@ class CheckK8sJobsStep(CheckK8sResourcesUsingPodsStep):
 
             # timemout job
             if not k8s.status.completion_time:
-                if any(
-                    waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
+                if any(waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
                     self._add_failing_resource(job)
             # completed job
             if any(waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
                 self.all_resources.append(job)
             jobs_pods += job_pods
 
+
 class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, pods, cleanup: bool = False,**kwargs):
+    def __init__(self, pods, cleanup: bool = False, **kwargs):
         """Init CheckK8sPodsStep."""
         super().__init__("pod", pods)
 
@@ -276,7 +287,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
         super()._init_resources()
         self.k8s_resources = self.core.list_namespaced_pod(NAMESPACE).items
 
-    def _parse_resources(self):
+    def _parse_resources(self):  # noqa
         """Parse the pods."""
         super()._parse_resources()
         excluded_pods = self._get_used_pods()
@@ -295,9 +306,10 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
                         pod_component = k8s.metadata.labels[
                             'app.kubernetes.io/name']
                     else:
-                        self.__logger.error("pod %s has no 'app' or 'app.kubernetes.io/name' in metadata: %s", pod_component, k8s.metadata.labels)
+                        self.__logger.error("pod %s has no 'app' or 'app.kubernetes.io/name' "
+                                            "in metadata: %s", pod_component, k8s.metadata.labels)
 
-                ## looks for docker version
+                # looks for docker version
                 for container in k8s.spec.containers:
                     pod_version = {}
                     pod_container_version = container.image.rsplit(":", 1)
@@ -317,7 +329,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
                     search_rule = "^(?P<source>[^/]*)/*(?P<container>[^:]*):*(?P<version>.*)$"
                     search = re.search(search_rule, container.image)
                     name = "{}/{}".format(search.group('source'),
-                                        search.group('container'))
+                                          search.group('container'))
                     version = search.group('version')
                     if name[-1] == '/':
                         name = name[0:-1]
@@ -416,7 +428,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
             )
         return logs
 
-    def _parse_container(self, pod, k8s_container, init=False):
+    def _parse_container(self, pod, k8s_container, init=False):  # noqa
         """Get the logs of a container."""
         logs = ""
         old_logs = ""
@@ -443,7 +455,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
                 logs = self._get_container_logs(pod=pod, container=container, full=False)
                 with open(
                         "{}/pod-{}-{}.log".format(self.res_dir,
-                                                pod.name, container.name),
+                                                  pod.name, container.name),
                         'w') as log_result:
                     log_result.write(logs)
                 if (not container.ready) and container.restart_count > 0:
@@ -451,15 +463,15 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
                                                         previous=True)
                     with open(
                             "{}/pod-{}-{}.old.log".format(self.res_dir,
-                                                        pod.name,
-                                                        container.name),
+                                                          pod.name,
+                                                          container.name),
                             'w') as log_result:
                         log_result.write(old_logs)
                 if (container.name in settings.FULL_LOGS_CONTAINERS):
                     logs = self._get_container_logs(pod=pod, container=container)
                     with open(
                             "{}/pod-{}-{}.log".format(self.res_dir,
-                                                    pod.name, container.name),
+                                                      pod.name, container.name),
                             'w') as log_result:
                         log_result.write(logs)
                 if (container.name in settings.SPECIFIC_LOGS_CONTAINERS):
@@ -484,7 +496,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
                             log_result.write(log_files[log_file])
             except client.rest.ApiException as exc:
                 self.__logger.warning("%scontainer %s of pod %s has an exception: %s",
-                                prefix, container.name, pod.name, exc.reason)
+                                      prefix, container.name, pod.name, exc.reason)
             self.jinja_env.get_template('container_log.html.j2').stream(
                 container=container,
                 pod_name=pod.name,
@@ -501,11 +513,12 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
                 return 1
         return 0
 
+
 class CheckK8sServicesStep(CheckK8sResourcesUsingPodsStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, pods, cleanup: bool = False,**kwargs):
+    def __init__(self, pods, cleanup: bool = False, **kwargs):
         """Init CheckK8sServicesStep."""
         super().__init__("service", pods)
 
@@ -527,11 +540,12 @@ class CheckK8sServicesStep(CheckK8sResourcesUsingPodsStep):
                     self.res_dir, service.name))
             self.all_resources.append(service)
 
+
 class CheckK8sDeploymentsStep(CheckK8sResourcesUsingPodsStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, pods, cleanup: bool = False,**kwargs):
+    def __init__(self, pods, cleanup: bool = False, **kwargs):
         """Init CheckK8sDeploymentsStep."""
         super().__init__("deployment", pods)
 
@@ -566,11 +580,12 @@ class CheckK8sDeploymentsStep(CheckK8sResourcesUsingPodsStep):
 
             self.all_resources.append(deployment)
 
+
 class CheckK8sResplicaSetsStep(CheckK8sResourcesUsingPodsStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, pods, cleanup: bool = False,**kwargs):
+    def __init__(self, pods, cleanup: bool = False, **kwargs):
         """Init CheckK8sResplicaSetsStep."""
         super().__init__("replicaset", pods)
 
@@ -601,17 +616,18 @@ class CheckK8sResplicaSetsStep(CheckK8sResourcesUsingPodsStep):
                 replicaset=replicaset).dump('{}/replicaset-{}.html'.format(
                     self.res_dir, replicaset.name))
 
-            if (not k8s.status.ready_replicas 
-                or (k8s.status.ready_replicas < k8s.status.replicas)):
+            if (not k8s.status.ready_replicas or
+                    (k8s.status.ready_replicas < k8s.status.replicas)):
                 self._add_failing_resource(replicaset)
 
             self.all_resources.append(replicaset)
 
+
 class CheckK8sStatefulSetsStep(CheckK8sResourcesUsingPodsStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, pods, cleanup: bool = False,**kwargs):
+    def __init__(self, pods, cleanup: bool = False, **kwargs):
         """Init CheckK8sStatefulSetsStep."""
         super().__init__("statefulset", pods)
 
@@ -648,11 +664,12 @@ class CheckK8sStatefulSetsStep(CheckK8sResourcesUsingPodsStep):
 
             self.all_resources.append(statefulset)
 
+
 class CheckK8sDaemonSetsStep(CheckK8sResourcesUsingPodsStep):
 
     __logger = logging.getLogger(__name__)
 
-    def __init__(self, pods, cleanup: bool = False,**kwargs):
+    def __init__(self, pods, cleanup: bool = False, **kwargs):
         """Init CheckK8sDaemonSetsStep."""
         super().__init__("daemonset", pods)
 
@@ -688,6 +705,7 @@ class CheckK8sDaemonSetsStep(CheckK8sResourcesUsingPodsStep):
 
             self.all_resources.append(daemonset)
 
+
 class CheckNamespaceStatusStep(CheckK8sResourcesStep):
     """Check status of all k8s resources in the selected namespace."""
 
@@ -777,7 +795,9 @@ class CheckNamespaceStatusStep(CheckK8sResourcesStep):
         for step in self._steps:
             if step.failing:
                 self.failing = True
-                self.__logger.info("%s failing: %s", step.resource_type, len(step.failing_resources))
+                self.__logger.info("%s failing: %s",
+                                   step.resource_type,
+                                   len(step.failing_resources))
             details[step.resource_type] = {
                 'number_all': len(step.all_resources),
                 'number_failing': len(step.failing_resources),
index 97a7823..3bb08c7 100644 (file)
@@ -52,8 +52,8 @@ class ConnectServiceSubToCloudRegionStep(BaseStep):
         super().execute()
         customer: Customer = Customer.get_by_global_customer_id(
             settings.GLOBAL_CUSTOMER_ID)
-        service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(
-            settings.SERVICE_NAME)
+        service_subscription: ServiceSubscription = \
+            customer.get_service_subscription_by_service_type(settings.SERVICE_NAME)
         cloud_region: CloudRegion = CloudRegion.get_by_id(
             cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
             cloud_region_id=settings.CLOUD_REGION_ID,
@@ -63,4 +63,6 @@ class ConnectServiceSubToCloudRegionStep(BaseStep):
         # for which we are sure that an availability zone has been created
         tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
 
-        service_subscription.link_to_cloud_region_and_tenant(cloud_region=cloud_region, tenant=tenant)
+        service_subscription.link_to_cloud_region_and_tenant(
+            cloud_region=cloud_region,
+            tenant=tenant)
index 8c361e4..adcf862 100644 (file)
@@ -4,7 +4,6 @@ from jinja2 import Environment, PackageLoader, select_autoescape
 from onapsdk.configuration import settings
 from onapsdk.exceptions import APIError
 from onapsdk.k8s import ConnectivityInfo
-from onapsdk.utils.jinja import jinja_env
 from onaptests.steps.base import BaseStep
 
 
@@ -31,7 +30,7 @@ class K8SConnectivityInfoStep(BaseStep):
          - K8S_CONFIG.
         """
         super().execute()
-        ######## Create Connectivity Info #########################################
+        # Create Connectivity Info #########################################
         try:
             self._logger.info("Check if k8s connectivity information exists")
             ConnectivityInfo.get_connectivity_info_by_region_id(
index 3065461..c6b440f 100644 (file)
@@ -33,7 +33,7 @@ class RegisterCloudRegionStep(BaseStep):
         return "AAI"
 
     @BaseStep.store_state
-    def execute(self):
+    def execute(self):  # noqa
         """Register cloud region.
 
         Use settings values:
index e8d16f9..f00b60e 100644 (file)
@@ -37,6 +37,7 @@ class K8sResource():
         else:
             return False
 
+
 class K8sPodParentResource(K8sResource):
     """K8sPodParentResource class."""
 
@@ -138,9 +139,11 @@ class Job(K8sPodParentResource):
 class Deployment(K8sPodParentResource):
     """Deployment class."""
 
+
 class ReplicaSet(K8sPodParentResource):
     """ReplicaSet class."""
 
+
 class StatefulSet(K8sPodParentResource):
     """StatefulSet class."""
 
@@ -162,4 +165,4 @@ class Secret(K8sResource):
 
 
 class Ingress(K8sResource):
-    """Ingress class."""
\ No newline at end of file
+    """Ingress class."""
index f0ad61b..27831c5 100644 (file)
@@ -12,6 +12,7 @@ import onaptests.utils.exceptions as onap_test_exceptions
 from ..base import BaseStep
 from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
 
+
 class K8SProfileStep(BaseStep):
     """CreateK8sProfileStep."""
 
@@ -116,8 +117,10 @@ class K8SProfileStep(BaseStep):
         self._logger.info("Create the k8s profile if it doesn't exist")
         super().execute()
         customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
-        service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
-        self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+        service_subscription: ServiceSubscription = \
+            customer.get_service_subscription_by_service_type(self.service_name)
+        self._service_instance: ServiceInstance = \
+            service_subscription.get_service_instance_by_name(self.service_instance_name)
 
         for vnf_instance in self._service_instance.vnf_instances:
             # possible to have several modules for 1 VNF
@@ -140,15 +143,15 @@ class K8SProfileStep(BaseStep):
                     self._logger.error("Missing rb profile information")
                     raise onap_test_exceptions.ProfileInformationException
 
-                ######## Check profile for Definition ###################################
+                # Check profile for Definition
                 try:
                     rbdef.get_profile_by_name(k8s_profile_name)
                 except ResourceNotFound:
-                    ######## Create profile for Definition ###################################
+                    # Create profile for Definition
                     profile = rbdef.create_profile(k8s_profile_name,
                                                    k8s_profile_namespace,
                                                    settings.K8S_PROFILE_K8S_VERSION)
-                    ####### Upload artifact for created profile ##############################
+                    # Upload artifact for created profile
                     profile.upload_artifact(open(settings.K8S_PROFILE_ARTIFACT_PATH, 'rb').read())
 
     @BaseStep.store_state(cleanup=True)
@@ -172,7 +175,7 @@ class K8SProfileStep(BaseStep):
                 if k8s_profile_name == "":
                     self._logger.error("K8s profile deletion failed, missing rb profile name")
                     raise onap_test_exceptions.ProfileInformationException
-                ######## Delete profile for Definition ###################################
+                # Delete profile for Definition
                 try:
                     profile = rbdef.get_profile_by_name(k8s_profile_name)
                     profile.delete()
index a2e1812..9908b6d 100644 (file)
@@ -188,7 +188,8 @@ class YamlTemplateServiceAlaCarteInstantiateStep(YamlTemplateBaseStep):
         super().execute()
         service = Service(self.service_name)
         customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
-        service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(service.name)
+        service_subscription: ServiceSubscription = \
+            customer.get_service_subscription_by_service_type(service.name)
         cloud_region: CloudRegion = CloudRegion.get_by_id(
             cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
             cloud_region_id=settings.CLOUD_REGION_ID,
@@ -210,8 +211,8 @@ class YamlTemplateServiceAlaCarteInstantiateStep(YamlTemplateBaseStep):
             distribution_completed = service.distributed
             if distribution_completed is True:
                 self._logger.info(
-                "Service Distribution for %s is sucessfully finished",
-                service.name)
+                    "Service Distribution for %s is sucessfully finished",
+                    service.name)
                 break
             self._logger.info(
                 "Service Distribution for %s ongoing, Wait for 60 s",
@@ -221,7 +222,7 @@ class YamlTemplateServiceAlaCarteInstantiateStep(YamlTemplateBaseStep):
 
         if distribution_completed is False:
             self._logger.error(
-                "Service Distribution for %s failed !!",service.name)
+                "Service Distribution for %s failed !!", service.name)
             raise onap_test_exceptions.ServiceDistributionException
 
         service_instantiation = ServiceInstantiation.instantiate_ala_carte(
@@ -243,8 +244,10 @@ class YamlTemplateServiceAlaCarteInstantiateStep(YamlTemplateBaseStep):
             self._logger.error("Service instantiation %s failed", self.service_instance_name)
             raise onap_test_exceptions.ServiceInstantiateException
         else:
-            service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
-            self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+            service_subscription: ServiceSubscription = \
+                customer.get_service_subscription_by_service_type(self.service_name)
+            self._service_instance: ServiceInstance = \
+                service_subscription.get_service_instance_by_name(self.service_instance_name)
 
     @YamlTemplateBaseStep.store_state(cleanup=True)
     def cleanup(self) -> None:
index b32d936..d97a109 100644 (file)
@@ -12,13 +12,23 @@ from onapsdk.aai.cloud_infrastructure.tenant import Tenant
 from onapsdk.configuration import settings
 from onapsdk.exceptions import ResourceNotFound
 from onapsdk.sdc.service import Service
-from onapsdk.so.instantiation import InstantiationParameter, ServiceInstantiation, VfmoduleParameters, VnfParameters, SoService
-from onaptests.steps.cloud.customer_service_subscription_create import CustomerServiceSubscriptionCreateStep
+from onapsdk.so.instantiation import (
+    InstantiationParameter,
+    ServiceInstantiation,
+    VfmoduleParameters,
+    VnfParameters,
+    SoService
+)
+from onaptests.steps.cloud.customer_service_subscription_create import (
+    CustomerServiceSubscriptionCreateStep
+)
 
 import onaptests.utils.exceptions as onap_test_exceptions
 from onaptests.steps.base import YamlTemplateBaseStep
 from onaptests.steps.onboard.service import YamlTemplateServiceOnboardStep
-from onaptests.steps.cloud.connect_service_subscription_to_cloud_region import ConnectServiceSubToCloudRegionStep
+from onaptests.steps.cloud.connect_service_subscription_to_cloud_region import (
+    ConnectServiceSubToCloudRegionStep
+)
 
 
 class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
@@ -48,7 +58,6 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
             else:  # only pnfs
                 self.add_step(CustomerServiceSubscriptionCreateStep(cleanup))
 
-
     @property
     def description(self) -> str:
         """Step description."""
@@ -125,7 +134,7 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
         return self.parent.service_instance_name
 
     @YamlTemplateBaseStep.store_state
-    def execute(self):
+    def execute(self):  # noqa
         """Instantiate service.
 
         Use settings values:
@@ -143,7 +152,8 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
         super().execute()
         service = Service(self.service_name)
         customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
-        service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(service.name)
+        service_subscription: ServiceSubscription = \
+            customer.get_service_subscription_by_service_type(service.name)
         if any(
                 filter(lambda x: x in self.yaml_template[self.service_name].keys(),
                        ["vnfs", "networks"])):
@@ -153,8 +163,9 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
             )
             tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
         else:
-            cloud_region, tenant = None, None  # Only PNF is going to be instantiated so
-                                               # neither cloud_region nor tenant are needed
+            #  Only PNF is going to be instantiated so
+            #  neither cloud_region nor tenant are needed
+            cloud_region, tenant = None, None
         try:
             owning_entity = OwningEntity.get_by_owning_entity_name(
                 settings.OWNING_ENTITY)
@@ -171,12 +182,12 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
             distribution_completed = service.distributed
             if distribution_completed is True:
                 self._logger.info(
-                "Service Distribution for %s is sucessfully finished",
-                service.name)
+                    "Service Distribution for %s is sucessfully finished",
+                    service.name)
                 break
             self._logger.info(
                 "Service Distribution for %s ongoing, Wait for %d s",
-                service.name,settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
+                service.name, settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
             time.sleep(settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
             nb_try += 1
 
@@ -190,16 +201,19 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
         if settings.MODEL_YAML_TEMPLATE:
             so_data = self.yaml_template[self.service_name]
             so_service = SoService(vnfs=so_data.get("vnfs", []),
-                                   subscription_service_type=so_data.get('subscription_service_type'))
+                                   subscription_service_type=so_data.get(
+                                       'subscription_service_type'))
         else:
             for vnf_data in self.yaml_template[self.service_name].get("vnfs", []):
                 vnf_params_list.append(VnfParameters(
                     vnf_data["vnf_name"],
-                    [InstantiationParameter(name=parameter["name"], value=parameter["value"]) for parameter in
+                    [InstantiationParameter(name=parameter["name"],
+                                            value=parameter["value"]) for parameter in
                      vnf_data.get("vnf_parameters", [])],
                     [VfmoduleParameters(vf_module_data["vf_module_name"],
-                                        [InstantiationParameter(name=parameter["name"], value=parameter["value"]) for
-                                         parameter in vf_module_data.get("parameters", [])]) \
+                                        [InstantiationParameter(name=parameter["name"],
+                                                                value=parameter["value"]) for
+                                         parameter in vf_module_data.get("parameters", [])])
                      for vf_module_data in vnf_data.get("vf_module_parameters", [])]
                 ))
 
@@ -227,8 +241,10 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
             self._logger.error("Service instantiation %s failed", self.service_instance_name)
             raise onap_test_exceptions.ServiceInstantiateException
 
-        service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
-        self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+        service_subscription: ServiceSubscription = \
+            customer.get_service_subscription_by_service_type(self.service_name)
+        self._service_instance: ServiceInstance = \
+            service_subscription.get_service_instance_by_name(self.service_instance_name)
 
     @YamlTemplateBaseStep.store_state(cleanup=True)
     def cleanup(self) -> None:
index 38bdee9..b4a7c77 100644 (file)
@@ -12,6 +12,7 @@ from ..base import YamlTemplateBaseStep
 from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
 from .k8s_profile_create import K8SProfileStep
 
+
 class YamlTemplateVfModuleAlaCarteInstantiateStep(YamlTemplateBaseStep):
     """Instantiate vf module a'la carte using YAML template."""
 
@@ -129,8 +130,10 @@ class YamlTemplateVfModuleAlaCarteInstantiateStep(YamlTemplateBaseStep):
         """
         super().execute()
         customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
-        service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
-        self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+        service_subscription: ServiceSubscription = \
+            customer.get_service_subscription_by_service_type(self.service_name)
+        self._service_instance: ServiceInstance = \
+            service_subscription.get_service_instance_by_name(self.service_instance_name)
         cloud_region: CloudRegion = CloudRegion.get_by_id(
             cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
             cloud_region_id=settings.CLOUD_REGION_ID,
index 7186c24..022a8b2 100644 (file)
@@ -101,7 +101,7 @@ class YamlTemplateVlAlaCarteInstantiateStep(YamlTemplateBaseStep):
 
         """
         # workaround, as Network name differs from model name (added " 0")
-        network_name=re.sub(r"\s\d$", r"", network_name)
+        network_name = re.sub(r"\s\d$", r"", network_name)
         for net in self.yaml_template[self.service_name]["networks"]:
             if net["vl_name"] == network_name:
                 if net['subnets'] is None:
@@ -130,11 +130,13 @@ class YamlTemplateVlAlaCarteInstantiateStep(YamlTemplateBaseStep):
         super().execute()
         service: Service = Service(self.service_name)
         customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
-        service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
-        service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+        service_subscription: ServiceSubscription = \
+            customer.get_service_subscription_by_service_type(self.service_name)
+        service_instance: ServiceInstance = \
+            service_subscription.get_service_instance_by_name(self.service_instance_name)
         self._service_instance = service_instance
         for idx, network in enumerate(service.networks):
-        #for network in self.yaml_template[self.service_name]["networks"]:
+            # for network in self.yaml_template[self.service_name]["networks"]:
             net_instantiation = service_instance.add_network(
                 network,
                 settings.LINE_OF_BUSINESS,
@@ -159,7 +161,7 @@ class YamlTemplateVlAlaCarteInstantiateStep(YamlTemplateBaseStep):
         """
         if self._cleanup:
             for net_instance in self._service_instance.network_instances:
-                self._logger.info("Start network deletion %s",net_instance.name)
+                self._logger.info("Start network deletion %s", net_instance.name)
                 net_deletion = net_instance.delete(a_la_carte=True)
                 try:
                     net_deletion.wait_for_finish(settings.ORCHESTRATION_REQUEST_TIMEOUT)
index eb9896f..379e285 100644 (file)
@@ -103,8 +103,10 @@ class YamlTemplateVnfAlaCarteInstantiateStep(YamlTemplateBaseStep):
         super().execute()
         service: Service = Service(self.service_name)
         customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
-        service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
-        self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+        service_subscription: ServiceSubscription = \
+            customer.get_service_subscription_by_service_type(self.service_name)
+        self._service_instance: ServiceInstance = \
+            service_subscription.get_service_instance_by_name(self.service_instance_name)
         cloud_region: CloudRegion = CloudRegion.get_by_id(
             cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
             cloud_region_id=settings.CLOUD_REGION_ID,
index c614857..502a7f0 100644 (file)
@@ -80,13 +80,12 @@ class ClampStep(YamlTemplateBaseStep):
         else:
             return self.parent.service_name
 
-
     def check(self, operational_policies: list, is_template: bool = False) -> str:
         """Check CLAMP requirements to create a loop."""
         self._logger.info("Check operational policy")
         for policy in operational_policies:
             exist = Clamp.check_policies(policy_name=policy["name"],
-                                         req_policies=30)# 30 required policy
+                                         req_policies=30)  # 30 required policy
             self._logger.info("Operational policy found.")
             if not exist:
                 raise ValueError("Couldn't load the policy %s", policy)
@@ -109,14 +108,14 @@ class ClampStep(YamlTemplateBaseStep):
 
     def loop_counter(self, action: str) -> None:
         """ Count number of loop instances."""
-        if  action == "plus":
+        if action == "plus":
             self.count += 1
-        if  action == "minus":
+        if action == "minus":
             self.count -= 1
 
     @YamlTemplateBaseStep.store_state
     def execute(self):
-        super().execute() # TODO work only the 1st time, not if already onboarded
+        super().execute()  # TODO work only the 1st time, not if already onboarded
 
         # Before instantiating, be sure that the service has been distributed
         service = Service(self.service_name)
@@ -128,8 +127,8 @@ class ClampStep(YamlTemplateBaseStep):
             distribution_completed = service.distributed
             if distribution_completed is True:
                 self._logger.info(
-                "Service Distribution for %s is sucessfully finished",
-                service.name)
+                    "Service Distribution for %s is sucessfully finished",
+                    service.name)
                 break
             self._logger.info(
                 "Service Distribution for %s ongoing, Wait for 60 s",
@@ -139,7 +138,7 @@ class ClampStep(YamlTemplateBaseStep):
 
         if distribution_completed is False:
             self._logger.error(
-                "Service Distribution for %s failed !!",service.name)
+                "Service Distribution for %s failed !!", service.name)
             raise onap_test_exceptions.ServiceDistributionException
 
         # time to wait for template load in CLAMP
index 07586ac..fab70ad 100644 (file)
@@ -13,9 +13,9 @@ class InstantiateLoop():
     """class instantiating a closed loop in clamp."""
 
     def __init__(self, template: str, loop_name: str, operational_policies: list):
-        self.template=template
-        self.loop_name=loop_name
-        self.operational_policies=operational_policies
+        self.template = template
+        self.loop_name = loop_name
+        self.operational_policies = operational_policies
 
         self._logger: logging.Logger = logging.getLogger("")
         logging.config.dictConfig(settings.LOG_CONFIG)
@@ -30,7 +30,6 @@ class InstantiateLoop():
                 self._logger.error("an error occured while adding an operational policy")
             self._logger.info("ADD OPERATION SUCCESSFULY DONE")
 
-
     def configure_policies(self, loop: LoopInstance) -> None:
         """Configure all policies."""
         self._logger.info("******** UPDATE MICROSERVICE POLICY *******")
@@ -38,8 +37,8 @@ class InstantiateLoop():
         loop.update_microservice_policy()
         self._logger.info("******** UPDATE OPERATIONAL POLICIES CONFIG *******")
         for policy in self.operational_policies:
-            #loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
-            #possible configurations for the moment
+            # loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
+            # possible configurations for the moment
             if policy["name"] == "MinMax":
                 loop.add_op_policy_config(loop.add_minmax_config)
             if policy["name"] == "Drools":
@@ -53,7 +52,7 @@ class InstantiateLoop():
         self._logger.info("******** SUBMIT POLICIES TO PE *******")
         submit = loop.act_on_loop_policy(loop.submit)
         self._logger.info("******** CHECK POLICIES SUBMITION *******")
-        if submit :
+        if submit:
             self._logger.info("Policies successfully submited to PE")
 
         else:
index 72daba2..c8984ba 100644 (file)
@@ -10,6 +10,7 @@ from onapsdk.configuration import settings
 from ..base import YamlTemplateBaseStep
 from .service import YamlTemplateVfOnboardStep
 
+
 class OnboardClampStep(YamlTemplateBaseStep):
     """Onboard class to create CLAMP templates."""
 
index 3605c29..666de33 100644 (file)
@@ -69,11 +69,12 @@ class CreateProfileStep(BaseStep):
     def execute(self) -> None:
         """Create profile."""
         super().execute()
-        definition: Definition = Definition.get_definition_by_name_version(\
+        definition: Definition = Definition.get_definition_by_name_version(
             rb_name=settings.MSB_K8S_RB_NAME,
             rb_version=settings.MSB_K8S_RB_VERSION)
         with open(settings.MSB_K8S_PROFILE_ARTIFACT_FILE_PATH, "rb") as profile_file:
-            self.profile = definition.create_profile(profile_name=settings.MSB_K8S_PROFILE_NAME,
-                                                     namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
-                                                     kubernetes_version=settings.K8S_VERSION)
+            self.profile = definition.create_profile(
+                profile_name=settings.MSB_K8S_PROFILE_NAME,
+                namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
+                kubernetes_version=settings.K8S_VERSION)
             self.profile.upload_artifact(profile_file.read())
index b1e7d4d..c0bec0d 100644 (file)
@@ -55,7 +55,8 @@ class ServiceOnboardStep(BaseStep):
 
         """
         super().execute()
-        service: Service = Service(name=settings.SERVICE_NAME, instantiation_type=settings.SERVICE_INSTANTIATION_TYPE)
+        service: Service = Service(name=settings.SERVICE_NAME,
+                                   instantiation_type=settings.SERVICE_INSTANTIATION_TYPE)
         if not service.created():
             service.create()
             if settings.VL_NAME != "":
@@ -70,7 +71,8 @@ class ServiceOnboardStep(BaseStep):
         # If the service is already distributed, do not try to checkin/onboard (replay of tests)
         # checkin is done if needed
         # If service is replayed, no need to try to re-onboard the model
-        # Double check because of: https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
+        # Double check because of:
+        # https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
         if not service.distributed and service.status != onapsdk_const.DISTRIBUTED:
             if service.status == onapsdk_const.DRAFT:
                 try:
@@ -186,7 +188,8 @@ class YamlTemplateServiceOnboardStep(YamlTemplateBaseStep):
             # If the service is already distributed, do not try to checkin/onboard (replay of tests)
             # checkin is done if needed
             # If service is replayed, no need to try to re-onboard the model
-        # Double check because of: https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
+        # Double check because of:
+        # https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
         if not service.distributed and service.status != onapsdk_const.DISTRIBUTED:
             if service.status == onapsdk_const.DRAFT:
                 try:
index 49b766b..577b1cf 100644 (file)
@@ -41,7 +41,9 @@ class VspOnboardStep(BaseStep):
         """
         super().execute()
         vendor: Vendor = Vendor(name=settings.VENDOR_NAME)
-        vsp: Vsp = Vsp(name=settings.VSP_NAME, vendor=vendor, package=open(settings.VSP_FILE_PATH, "rb"))
+        vsp: Vsp = Vsp(name=settings.VSP_NAME,
+                       vendor=vendor,
+                       package=open(settings.VSP_FILE_PATH, "rb"))
         vsp.onboard()
 
     @BaseStep.store_state(cleanup=True)
@@ -123,8 +125,8 @@ class YamlTemplateVspOnboardStep(YamlTemplateBaseStep):
                 if "heat_files_to_upload" in pnf:
                     with open(get_resource_location(pnf["heat_files_to_upload"]), "rb") as package:
                         vsp: Vsp = Vsp(name=f"{pnf['pnf_name']}_VSP",
-                                    vendor=vendor,
-                                    package=package)
+                                       vendor=vendor,
+                                       package=package)
                         vsp.onboard()
 
     @YamlTemplateBaseStep.store_state(cleanup=True)
index be06151..99a6ef2 100644 (file)
@@ -15,6 +15,7 @@ class ReportStepStatus(Enum):
     FAIL = "FAIL"
     NOT_EXECUTED = "NOT EXECUTED"
 
+
 @dataclass
 class Report:
     """Step execution report."""
@@ -85,7 +86,8 @@ class ReportsCollection:
             details=details,
             components=components,
             log_path="./pythonsdk.debug.log").dump(
-                str(Path(settings.REPORTING_FILE_DIRECTORY).joinpath(settings.HTML_REPORTING_FILE_NAME)))
+                str(Path(settings.REPORTING_FILE_DIRECTORY).joinpath(
+                    settings.HTML_REPORTING_FILE_NAME)))
 
         report_dict = {
             'usecase': usecase,
@@ -101,5 +103,6 @@ class ReportsCollection:
                 for step_report in reversed(self.report)
             ]
         }
-        with (Path(settings.REPORTING_FILE_DIRECTORY).joinpath(settings.JSON_REPORTING_FILE_NAME)).open('w') as file:
+        with (Path(settings.REPORTING_FILE_DIRECTORY).joinpath(
+                settings.JSON_REPORTING_FILE_NAME)).open('w') as file:
             json.dump(report_dict, file, indent=4)
index 6933fa0..9fc4162 100644 (file)
@@ -47,11 +47,11 @@ class CdsMockserverCnfConfigureStep(BaseStep):
                 response = requests.put(
                     "http://portal.api.simpledemo.onap.org:30726/mockserver/expectation",
                     json={
-                        "httpRequest" : {
+                        "httpRequest": {
                             "method": expectation["method"],
                             "path": expectation["path"]
                         },
-                        "httpResponse" : {
+                        "httpResponse": {
                             "body": expectation["response"]
                         }
                     }
index 3a846e3..2a4f47e 100644 (file)
@@ -50,13 +50,13 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
             config.load_incluster_config()
         else:
             config.load_kube_config(config_file=settings.K8S_CONFIG)
-        k8s_client: "CoreV1API" = client.CoreV1Api()
-        k8s_watch: "Watch" =  watch.Watch()
+        k8s_client: "client.CoreV1Api" = client.CoreV1Api()
+        k8s_watch: "watch.Watch" = watch.Watch()
         status = False
         try:
             for event in k8s_watch.stream(k8s_client.list_namespaced_pod,
-                                        namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
-                                        timeout_seconds=timeout_seconds):
+                                          namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
+                                          timeout_seconds=timeout_seconds):
                 if event["object"].metadata.name == "pnf-macro-test-simulator":
                     if not event["object"].status.phase in ["Pending", "Running"]:
                         # Invalid pod state
@@ -82,9 +82,10 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
             config.load_incluster_config()
         else:
             config.load_kube_config(config_file=settings.K8S_CONFIG)
-        k8s_client: "CoreV1API" = client.CoreV1Api()
+        k8s_client: "client.CoreV1Api" = client.CoreV1Api()
         try:
-            for service in k8s_client.list_namespaced_service(namespace=settings.K8S_ONAP_NAMESPACE).items:
+            for service in k8s_client.list_namespaced_service(
+                    namespace=settings.K8S_ONAP_NAMESPACE).items:
                 if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME:
                     proto = "http"
                     if "443" in str(service.spec.ports[0].port):
@@ -102,11 +103,13 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
         status = self.is_pnf_pod_running()
         if not status:
             raise EnvironmentPreparationException("PNF simulator is not running")
-        time.sleep(settings.PNF_WAIT_TIME)  # Let's still wait for PNF simulator to make sure it's initialized
+        # Let's still wait for PNF simulator to make sure it's initialized
+        time.sleep(settings.PNF_WAIT_TIME)
         ves_proto, ves_ip, ves_port = self.get_ves_protocol_ip_and_port()
         registration_number: int = 0
         registered_successfully: bool = False
-        while registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and not registered_successfully:
+        while (registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and
+               not registered_successfully):
             try:
                 response = requests.post(
                     "http://portal.api.simpledemo.onap.org:30999/simulator/start",
@@ -114,7 +117,7 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
                         "simulatorParams": {
                             "repeatCount": 9999,
                             "repeatInterval": 30,
-                            "vesServerUrl": f"{ves_proto}://sample1:sample1@{ves_ip}:{ves_port}/eventListener/v7"
+                            "vesServerUrl": f"{ves_proto}://sample1:sample1@{ves_ip}:{ves_port}/eventListener/v7"  # noqa
                         },
                         "templateName": "registration.json",
                         "patch": {
@@ -132,7 +135,8 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
                 )
                 response.raise_for_status()
                 registered_successfully = True
-                self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} source name")
+                self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} "
+                                  "source name")
             except (requests.ConnectionError, requests.HTTPError) as http_error:
                 self._logger.debug(f"Can't connect with PNF simulator: {str(http_error)}")
                 registration_number = registration_number + 1
index b8a6b21..f745150 100644 (file)
@@ -4,12 +4,11 @@ from avionix import ChartBuilder, ChartDependency, ChartInfo
 from avionix.errors import HelmError
 from onaptests.steps.base import BaseStep
 from onaptests.utils.resources import get_local_dir
-from onaptests.utils.exceptions import  (
+from onaptests.utils.exceptions import (
     EnvironmentPreparationException,
     EnvironmentCleanupException)
 
 
-
 class HelmChartStep(BaseStep):
     """Basic operations on a docker container."""
 
@@ -36,7 +35,6 @@ class HelmChartStep(BaseStep):
             msg = f"{chart_info_file} not found."
             raise EnvironmentPreparationException(msg) from err
 
-
         try:
             for dependency in chart_info["dependencies"]:
                 dep = ChartDependency(
@@ -48,16 +46,16 @@ class HelmChartStep(BaseStep):
                 dependencies.append(dep)
 
             self.builder = ChartBuilder(
-                    chart_info=ChartInfo(
-                        api_version=chart_info["api_version"],
-                        name=chart_info["chart_name"],
-                        version=chart_info["version"],  # SemVer 2 version
-                        app_version=chart_info["app_version"],
-                        dependencies=dependencies
-                    ),
-                    kubernetes_objects=[],
-                    keep_chart=False
-                )
+                chart_info=ChartInfo(
+                    api_version=chart_info["api_version"],
+                    name=chart_info["chart_name"],
+                    version=chart_info["version"],  # SemVer 2 version
+                    app_version=chart_info["app_version"],
+                    dependencies=dependencies
+                ),
+                kubernetes_objects=[],
+                keep_chart=False
+            )
         except KeyError as err:
             msg = f"{chart_info_file} does not contain required keys."
             raise EnvironmentPreparationException(msg) from err
@@ -82,7 +80,6 @@ class HelmChartStep(BaseStep):
             msg = "Error during helm release installation."
             raise EnvironmentPreparationException(msg) from err
 
-
     def cleanup(self) -> None:
         """Uninstall helm release."""
         try:
index 18ba5df..8b950da 100644 (file)
@@ -4,6 +4,7 @@ import requests
 from onaptests.steps.base import BaseStep
 from onaptests.utils.exceptions import TestConfigurationException
 
+
 class SimulatorStartStep(BaseStep):
     """Basic operations on a docker container."""
 
index 51c56f6..6842714 100644 (file)
@@ -8,85 +8,96 @@
 
 __author__ = ("Morgan Richomme <morgan.richomme@orange.com>")
 
-class  OnapTestException(Exception):
+
+class OnapTestException(Exception):
     """Parent Class for all Onap Test Exceptions."""
-    error_message='Generic OnapTest exception'
+    error_message = 'Generic OnapTest exception'
+
 
 class TestConfigurationException(OnapTestException):
     """Raise when configuration of the use case is incomplete or buggy."""
-    error_message='Configuration error'
+    error_message = 'Configuration error'
+
 
 class ServiceDistributionException(OnapTestException):
     """Service not properly distributed."""
-    error_message='Service not well distributed'
+    error_message = 'Service not well distributed'
 
 
 class ServiceInstantiateException(OnapTestException):
     """Service cannot be instantiated."""
-    error_message='Service instantiation error'
+    error_message = 'Service instantiation error'
 
 
 class ServiceCleanupException(OnapTestException):
     """Service cannot be cleaned."""
-    error_message='Service not well cleaned up'
+    error_message = 'Service not well cleaned up'
 
 
 class VnfInstantiateException(OnapTestException):
     """VNF cannot be instantiated."""
-    error_message='VNF instantiation error'
+    error_message = 'VNF instantiation error'
 
 
 class VnfCleanupException(OnapTestException):
     """VNF cannot be cleaned."""
-    error_message="VNF can't be cleaned"
+    error_message = "VNF can't be cleaned"
 
 
 class VfModuleInstantiateException(OnapTestException):
     """VF Module cannot be instantiated."""
-    error_message='VF Module instantiation error'
+    error_message = 'VF Module instantiation error'
 
 
 class VfModuleCleanupException(OnapTestException):
     """VF Module cannot be cleaned."""
-    error_message="VF Module can't be cleaned"
+    error_message = "VF Module can't be cleaned"
 
 
 class NetworkInstantiateException(OnapTestException):
     """Network cannot be instantiated."""
-    error_message='Network instantiation error'
+    error_message = 'Network instantiation error'
 
 
 class NetworkCleanupException(OnapTestException):
     """Network cannot be cleaned."""
-    error_message="Network can't be cleaned"
+    error_message = "Network can't be cleaned"
+
 
 class ProfileInformationException(OnapTestException):
     """Missing k8s profile information."""
-    error_message='Missing k8s profile information'
+    error_message = 'Missing k8s profile information'
+
 
 class ProfileCleanupException(OnapTestException):
     """K8s profile cannot be cleaned."""
-    error_message="Profile can't be cleaned"
+    error_message = "Profile can't be cleaned"
+
 
 class EnvironmentPreparationException(OnapTestException):
     """Test environment preparation exception."""
-    error_message="Test can't be run properly due to preparation error"
+    error_message = "Test can't be run properly due to preparation error"
+
 
 class SubstepExecutionException(OnapTestException):
     """Exception raised if substep execution fails."""
 
+
 class EnvironmentCleanupException(OnapTestException):
     """Test environment cleanup exception."""
-    error_message="Test couldn't finish a cleanup"
+    error_message = "Test couldn't finish a cleanup"
+
 
 class PolicyException(OnapTestException):
     """Policy exception."""
-    error_message="Problem with policy module"
+    error_message = "Problem with policy module"
+
 
 class DcaeException(OnapTestException):
     """DCAE exception."""
-    error_message="Problem with DCAE module"
+    error_message = "Problem with DCAE module"
+
 
 class StatusCheckException(OnapTestException):
     """Status Check exception."""
-    error_message="Namespace status check has failed"
+    error_message = "Namespace status check has failed"
diff --git a/tox.ini b/tox.ini
index dd497ea..78bdbb8 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,6 @@
 [tox]
 minversion = 3.2.0
-envlist = json,yaml,py,rst,md
+envlist = json,yaml,py,rst,md,pylama
 skipsdist = true
 requires = pip >= 8
 
@@ -56,3 +56,7 @@ commands_pre =
 commands =
     /bin/bash -c "coala --non-interactive --disable-caching --no-autoapply-warn md --files $(</tmp/.coalist_md) \ "
 
+[testenv:pylama]
+deps = pylama
+skip_install = True
+commands = pylama src/