--- /dev/null
+# ============LICENSE_START====================================================
+# org.onap.vvp/validation-scripts
+# ===================================================================
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the "License");
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+repos:
+- repo: https://github.com/ambv/black
+ rev: stable
+ hooks:
+ - id: black
+ language_version: python3.6
import io
import json
import os
-import subprocess #nosec
+import subprocess # nosec
import sys
import pytest
def check_app_tests_pass():
- return run_pytest("tests", "--self-test",
- msg="app_tests failed. Run pytest app_tests and fix errors.")
+ return run_pytest(
+ "tests",
+ "--self-test",
+ msg="app_tests failed. Run pytest app_tests and fix errors.",
+ )
def check_self_test_pass():
- return run_pytest("tests", "--self-test",
- msg="self-test failed. Run pytest --self-test and fix errors.")
+ return run_pytest(
+ "tests",
+ "--self-test",
+ msg="self-test failed. Run pytest --self-test and fix errors.",
+ )
def check_testable_requirements_are_mapped():
def check_bandit_passes():
- result = subprocess.run( #nosec
- ["bandit", "-c", "bandit.yaml", "-r", ".", "-x", "./.tox/**"], #nosec
- encoding="utf-8", #nosec
- stdout=subprocess.PIPE, #nosec
- stderr=subprocess.PIPE, #nosec
- ) #nosec
+ result = subprocess.run( # nosec
+ ["bandit", "-c", "bandit.yaml", "-r", ".", "-x", "./.tox/**"], # nosec
+ encoding="utf-8", # nosec
+ stdout=subprocess.PIPE, # nosec
+ stderr=subprocess.PIPE, # nosec
+ ) # nosec
msgs = result.stdout.split("\n") if result.returncode != 0 else []
return ["bandit errors detected:"] + [f" {e}" for e in msgs] if msgs else []
base_path = THIS_DIR / "sample_env/preloads/grapi/base_incomplete.json"
data = load_json(base_path)
vnf_name = data["input"]["preload-vf-module-topology-information"][
- "vnf-topology-identifier-structure"]["vnf-name"]
+ "vnf-topology-identifier-structure"
+ ]["vnf-name"]
assert vnf_name == "VALUE FOR: vnf_name"
class PreloadEnvironment:
-
def __init__(self, env_dir, parent=None):
self.base_dir = Path(env_dir)
self.parent = parent
value.append((node_key, node_value))
- return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', value)
+ return yaml.nodes.MappingNode(u"tag:yaml.org,2002:map", value)
def get_json_template(template_dir, template_name):
self._add_parameters(preload, vnf_module)
def _add_vnf_metadata(self, preload):
- vnf_meta = preload["input"]["vnf-topology-information"]["vnf-topology-identifier"]
+ vnf_meta = preload["input"]["vnf-topology-information"][
+ "vnf-topology-identifier"
+ ]
vnf_meta["vnf-name"] = self.replace("vnf_name")
vnf_meta["generic-vnf-type"] = self.replace(
"vnf-type",
"--env-directory",
dest="env_dir",
action="store",
- help="optional directory of .env files for preload generation"
+ help="optional directory of .env files for preload generation",
)
parser.addoption(
help=(
"Preload format to create (multiple allowed). If not provided "
"then all available formats will be created: {}"
- ).format(", ".join(get_generator_plugin_names()))
+ ).format(", ".join(get_generator_plugin_names())),
)
:param zip_path: path to valid zip file
:param target_dir: directory to unzip zip_path
"""
- check(zipfile.is_zipfile(zip_path), "{} is not a valid zipfile or does not exist".format(zip_path))
+ check(
+ zipfile.is_zipfile(zip_path),
+ "{} is not a valid zipfile or does not exist".format(zip_path),
+ )
archive = zipfile.ZipFile(zip_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
An OS::Nova:Port with the property binding:vnic_type
"""
resource_properties = nested_dict.get(resource, "properties", default={})
- if nested_dict.get(resource, "type") == cls.resource_type and resource_properties.get("binding:vnic_type", "") == "direct":
+ if (
+ nested_dict.get(resource, "type") == cls.resource_type
+ and resource_properties.get("binding:vnic_type", "") == "direct"
+ ):
return True
return False
try:
return int(count_value)
except (ValueError, TypeError):
- print((
- "WARNING: Invalid value for count parameter {}. Expected "
- "an integer, but got {}. Defaulting to 1"
- ).format(count_param, count_value))
+ print(
+ (
+ "WARNING: Invalid value for count parameter {}. Expected "
+ "an integer, but got {}. Defaulting to 1"
+ ).format(count_param, count_value)
+ )
return 1
@property
"string": {
"readable": "{network-role}_subnet_id or {network-role}_v6_subnet_id",
"machine": RE_EXTERNAL_PARAM_SID,
- },
+ }
},
"internal": {
"string": {
"readable": "int_{network-role}_subnet_id or int_{network-role}_v6_subnet_id",
"machine": RE_INTERNAL_PARAM_SID,
- },
+ }
},
"parameter_to_resource_comparisons": ["network_role"],
}
@validates("R-100000", "R-100010", "R-100030", "R-100050", "R-100070")
def test_contrail_external_instance_ip_address_parameter(yaml_file):
- check_parameter_format(yaml_file, iip_regx_dict, "external", ContrailV2InstanceIpProcessor, "instance_ip_address")
+ check_parameter_format(
+ yaml_file,
+ iip_regx_dict,
+ "external",
+ ContrailV2InstanceIpProcessor,
+ "instance_ip_address",
+ )
@validates("R-100000", "R-100090", "R-100110", "R-100130", "R-100150")
def test_contrail_internal_instance_ip_address_parameter(yaml_file):
- check_parameter_format(yaml_file, iip_regx_dict, "internal", ContrailV2InstanceIpProcessor, "instance_ip_address")
+ check_parameter_format(
+ yaml_file,
+ iip_regx_dict,
+ "internal",
+ ContrailV2InstanceIpProcessor,
+ "instance_ip_address",
+ )
@validates("R-100190", "R-100200", "R-100220")
def test_contrail_external_instance_subnet_id_parameter(yaml_file):
- check_parameter_format(yaml_file, sid_regx_dict, "external", ContrailV2InstanceIpProcessor, "subnet_uuid")
+ check_parameter_format(
+ yaml_file,
+ sid_regx_dict,
+ "external",
+ ContrailV2InstanceIpProcessor,
+ "subnet_uuid",
+ )
@validates("R-100190", "R-100240", "R-100260")
def test_contrail_internal_instance_subnet_id_parameter(yaml_file):
- check_parameter_format(yaml_file, sid_regx_dict, "internal", ContrailV2InstanceIpProcessor, "subnet_uuid")
+ check_parameter_format(
+ yaml_file,
+ sid_regx_dict,
+ "internal",
+ ContrailV2InstanceIpProcessor,
+ "subnet_uuid",
+ )
@validates("R-100240", "R-100260")
if not subnet_param:
continue
if subnet_param not in base_outputs:
- errors.append((
- "Resource ({}) is designated as an internal IP, but its "
- "subnet_uuid parameter ({}) does not refer to subnet in "
- "this template nor is it defined in the output section "
- "of the base module ({})"
- ).format(r_id, subnet_param, os.path.basename(base_path)))
+ errors.append(
+ (
+ "Resource ({}) is designated as an internal IP, but its "
+ "subnet_uuid parameter ({}) does not refer to subnet in "
+ "this template nor is it defined in the output section "
+ "of the base module ({})"
+ ).format(r_id, subnet_param, os.path.basename(base_path))
+ )
assert not errors, ". ".join(errors)
**MUST**
contain the ``{vm-type}``.
"""
- run_test(
- yaml_file, ContrailV2ServiceHealthCheckProcessor, get_vm_types, "vm_type"
- )
+ run_test(yaml_file, ContrailV2ServiceHealthCheckProcessor, get_vm_types, "vm_type")
@validates("R-16437")
for spec in specs:
if persistent_only and not spec.get("persistent"):
continue
- results.extend(get_template_parameters(yaml_file, resource_type,
- spec, all_resources))
+ results.extend(
+ get_template_parameters(yaml_file, resource_type, spec, all_resources)
+ )
return {item["param"] for item in results}
@validates("R-40971", "R-35735", "R-23503", "R-71577", "R-04697", "R-34037")
def test_external_fip_format(yaml_file):
- check_parameter_format(yaml_file, fip_regx_dict, "external", NeutronPortProcessor, "fixed_ips", "ip_address")
+ check_parameter_format(
+ yaml_file,
+ fip_regx_dict,
+ "external",
+ NeutronPortProcessor,
+ "fixed_ips",
+ "ip_address",
+ )
@validates("R-27818", "R-29765", "R-85235", "R-78380", "R-34037")
def test_internal_fip_format(yaml_file):
- check_parameter_format(yaml_file, fip_regx_dict, "internal", NeutronPortProcessor, "fixed_ips", "ip_address")
+ check_parameter_format(
+ yaml_file,
+ fip_regx_dict,
+ "internal",
+ NeutronPortProcessor,
+ "fixed_ips",
+ "ip_address",
+ )
r = Resource(resource_id=resource_id, resource=resource)
properties = r.get_nested_properties()
resources = r.get_nested_yaml(base_dir).get("resources", {})
- for nrid, nresource_dict in resources.items(): # iterate through nested file until found target r type
+ for (
+ nrid,
+ nresource_dict,
+ ) in (
+ resources.items()
+ ): # iterate through nested file until found target r type
if (
nresource_dict.get("type")
):
continue
- for nparam in prop_iterator(nresource_dict, *nprops): # get iterator of all target parameters
- if nparam and "get_param" in nparam: # iterator yields None if parameter isn't found
+ for nparam in prop_iterator(
+ nresource_dict, *nprops
+ ): # get iterator of all target parameters
+ if (
+ nparam and "get_param" in nparam
+ ): # iterator yields None if parameter isn't found
nparam = nparam.get("get_param")
- for k1, v1 in properties.items(): # found nparam, now comparing to parent template
+ for (
+ k1,
+ v1,
+ ) in (
+ properties.items()
+ ): # found nparam, now comparing to parent template
if isinstance(v1, dict) and "get_param" in v1:
parameter = v1.get("get_param")
# k1: nested resource parameter definition
if isinstance(parameter, list):
parameter = parameter[0]
- if k1 != nparam: # we only care about the parameter we found in nested template
+ if (
+ k1 != nparam
+ ): # we only care about the parameter we found in nested template
continue
if k1 != parameter:
# def test_parameter_name_doesnt_change_in_nested_template(yaml_file):
# check_nested_parameter_doesnt_change(yaml_file)
+
@validates("R-708564")
def test_server_name_parameter_name_doesnt_change_in_nested_template(heat_template):
check_nested_parameter_doesnt_change(heat_template, "OS::Nova::Server", "name")
"virtual_machine_interface_allowed_address_pairs",
"virtual_machine_interface_allowed_address_pairs_allowed_address_pair",
"virtual_machine_interface_allowed_address_pairs_allowed_address_pair_ip",
- "virtual_machine_interface_allowed_address_pairs_allowed_address_pair_ip_ip_prefix"
+ "virtual_machine_interface_allowed_address_pairs_allowed_address_pair_ip_ip_prefix",
)
def test_non_server_name_unique(heat_template):
"""Test name has unique value
"""
- list_nest = nested_files.get_list_of_nested_files(heat_template, os.path.dirname(heat_template))
+ list_nest = nested_files.get_list_of_nested_files(
+ heat_template, os.path.dirname(heat_template)
+ )
list_nest.append(heat_template)
non_servers = {}
for yaml_file in list_nest:
def is_nova_server(resource):
- return "type" in resource and "properties" in resource and resource.get("type") == "OS::Nova::Server"
+ return (
+ "type" in resource
+ and "properties" in resource
+ and resource.get("type") == "OS::Nova::Server"
+ )
def get_vm_type_for_nova_server(resource):
#
import os
import platform
-import subprocess #nosec
+import subprocess # nosec
import sys
import tempfile
from urllib import request
PREBUILT_DOWNLOAD_SITE = "https://download.lfd.uci.edu/pythonlibs/n5jyqt7p/"
PREBUILT_WIN_LIBS = [
"yappi-1.0-cp{python_version}-cp{python_version}m-{arch}.whl",
- "setproctitle-1.1.10-cp{python_version}-cp{python_version}m-{arch}.whl"
+ "setproctitle-1.1.10-cp{python_version}-cp{python_version}m-{arch}.whl",
]
def is_windows():
- return os.name == 'nt'
+ return os.name == "nt"
def python_version():
def download_url(url):
- resp = request.urlopen(url) #nosec
+ resp = request.urlopen(url) # nosec
return resp.read()
return
temp_dir = tempfile.mkdtemp()
for lib in PREBUILT_WIN_LIBS:
- filename = lib.format(python_version=python_version(),
- arch=system_architecture())
+ filename = lib.format(
+ python_version=python_version(), arch=system_architecture()
+ )
url = PREBUILT_DOWNLOAD_SITE + filename
print(f"Downloading {url}")
contents = download_url(url)
file_path = os.path.join(temp_dir, filename)
write_file(contents, file_path, mode="wb")
print("Download complete. Installing dependency.")
- subprocess.call(["pip", "install", file_path]) #nosec
+ subprocess.call(["pip", "install", file_path]) # nosec
if __name__ == "__main__":
openstack-heat
cached-property>=1.5,<1.6
bandit
+black
+pre-commit
+++ /dev/null
-# Implemented Tests
-
-Reworking the documentation so removing these for now