#
# ============LICENSE_END============================================
import os
-import shutil
from abc import ABC, abstractmethod
from collections import OrderedDict
+from itertools import chain
+from typing import Tuple, List
-from preload.generator import yield_by_count
-from preload.environment import PreloadEnvironment
from tests.helpers import (
get_param,
get_environment_pair,
prop_iterator,
- get_output_dir,
is_base_module,
remove,
)
from tests.test_environment_file_parameters import get_preload_excluded_parameters
from tests.utils import nested_dict
from tests.utils.vm_types import get_vm_type_for_nova_server
-from config import Config, get_generator_plugins
from tests.test_environment_file_parameters import ENV_PARAMETER_SPEC
self.subnet_params = set()
def filter_output_params(self, base_outputs):
- self.subnet_params = remove(self.subnet_params, base_outputs)
+ self.subnet_params = remove(
+ self.subnet_params, base_outputs, key=lambda s: s.param_name
+ )
def __hash__(self):
return hash(self.network_role)
return hash(self) == hash(other)
+class Subnet:
+ def __init__(self, param_name: str):
+ self.param_name = param_name
+
+ @property
+ def ip_version(self):
+ return 6 if "_v6_" in self.param_name else 4
+
+ def __hash__(self):
+ return hash(self.param_name)
+
+ def __eq__(self, other):
+ return hash(self) == hash(other)
+
+
class Port(FilterBaseOutputs):
def __init__(self, vm, network):
self.vm = vm
self.network = network
self.fixed_ips = []
- self.floating_ips = []
+ self.floating_ips = set()
self.uses_dhcp = True
def add_ips(self, props):
self.uses_dhcp = False
self.fixed_ips.append(IpParam(ip_address, self))
if subnet:
- self.network.subnet_params.add(subnet)
+ self.network.subnet_params.add(Subnet(subnet))
for ip in prop_iterator(props, "allowed_address_pairs", "ip_address"):
- self.uses_dhcp = False
param = get_param(ip) if ip else ""
if param:
- self.floating_ips.append(IpParam(param, self))
+ self.floating_ips.add(IpParam(param, self))
+
+ @property
+ def ipv6_fixed_ips(self):
+ return list(
+ sorted(
+ (ip for ip in self.fixed_ips if ip.ip_version == 6),
+ key=lambda ip: ip.param,
+ )
+ )
+
+ @property
+ def ipv4_fixed_ips(self):
+ return list(
+ sorted(
+ (ip for ip in self.fixed_ips if ip.ip_version == 4),
+ key=lambda ip: ip.param,
+ )
+ )
+
+ @property
+ def fixed_ips_with_index(self) -> List[Tuple[int, IpParam]]:
+ ipv4s = enumerate(self.ipv4_fixed_ips)
+ ipv6s = enumerate(self.ipv6_fixed_ips)
+ return list(chain(ipv4s, ipv6s))
def filter_output_params(self, base_outputs):
self.fixed_ips = remove(self.fixed_ips, base_outputs, key=lambda ip: ip.param)
class Vnf:
- def __init__(self, templates):
- self.modules = [VnfModule(t, self) for t in templates]
+ def __init__(self, templates, config=None):
+ self.modules = [VnfModule(t, self, config) for t in templates]
self.uses_contrail = self._uses_contrail()
+ self.config = config
self.base_module = next(
(mod for mod in self.modules if mod.is_base_module), None
)
@property
def base_output_params(self):
- return self.base_module.heat.outputs
+ return self.base_module.heat.outputs if self.base_module else {}
def filter_base_outputs(self):
non_base_modules = (m for m in self.modules if not m.is_base_module)
:return: path to env file (assumes it is present and named correctly)
"""
base_path = os.path.splitext(heat_path)[0]
- return "{}.env".format(base_path)
+ env_path = "{}.env".format(base_path)
+ return env_path if os.path.exists(env_path) else None
class VnfModule(FilterBaseOutputs):
- def __init__(self, template_file, vnf):
+ def __init__(self, template_file, vnf, config):
self.vnf = vnf
+ self.config = config
self.vnf_name = os.path.splitext(os.path.basename(template_file))[0]
self.template_file = template_file
self.heat = Heat(filepath=template_file, envpath=env_path(template_file))
env_yaml = env_pair.get("eyml") if env_pair else {}
self.parameters = {key: "" for key in self.heat.parameters}
self.parameters.update(env_yaml.get("parameters") or {})
+ # Filter out any parameters passed from the volume module's outputs
+ self.parameters = {
+ key: value
+ for key, value in self.parameters.items()
+ if key not in self.volume_module_outputs
+ }
self.networks = []
self.virtual_machine_types = self._create_vm_types()
self._add_networks()
self.outputs_filtered = False
+ @property
+ def volume_module_outputs(self):
+ heat_dir = os.path.dirname(self.template_file)
+ heat_filename = os.path.basename(self.template_file)
+ basename, ext = os.path.splitext(heat_filename)
+ volume_template_name = "{}_volume{}".format(basename, ext)
+ volume_path = os.path.join(heat_dir, volume_template_name)
+ if os.path.exists(volume_path):
+ volume_mod = Heat(filepath=volume_path)
+ return volume_mod.outputs
+ else:
+ return {}
+
def filter_output_params(self, base_outputs):
for vm in self.virtual_machine_types:
vm.filter_output_params(base_outputs)
@property
def env_specs(self):
"""Return available Environment Spec definitions"""
- try:
- return Config().env_specs
- except FileNotFoundError:
- return [ENV_PARAMETER_SPEC]
+ return [ENV_PARAMETER_SPEC] if not self.config else self.config.env_specs
@property
def platform_provided_params(self):
params[az] = CHANGE
for network in self.networks:
params[network.name_param] = CHANGE
- for param in set(network.subnet_params):
+ for param in set(s.param_name for s in network.subnet_params):
params[param] = CHANGE
for vm in self.virtual_machine_types:
for name in set(vm.names):
:return: dict of parameters suitable for the preload
"""
excluded = get_preload_excluded_parameters(self.template_file)
+ excluded.update(self.platform_provided_params)
params = {k: v for k, v in self.parameters.items() if k not in excluded}
return params
return hash(self) == hash(other)
-def create_preloads(config, exitstatus):
+def yield_by_count(sequence):
"""
- Create preloads in every format that can be discovered by get_generator_plugins
+ Iterates through sequence and yields each item according to its __count__
+ attribute. If an item has a __count__ of it will be returned 3 times
+ before advancing to the next item in the sequence.
+
+ :param sequence: sequence of dicts (must contain __count__)
+ :returns: generator of tuple key, value pairs
"""
- if config.getoption("self_test"):
- return
- print("+===================================================================+")
- print("| Preload Template Generation |")
- print("+===================================================================+")
-
- preload_dir = os.path.join(get_output_dir(config), "preloads")
- if os.path.exists(preload_dir):
- shutil.rmtree(preload_dir)
- env_directory = config.getoption("env_dir")
- preload_env = PreloadEnvironment(env_directory) if env_directory else None
- plugins = get_generator_plugins()
- available_formats = [p.format_name() for p in plugins]
- selected_formats = config.getoption("preload_formats") or available_formats
- heat_templates = get_heat_templates(config)
- vnf = None
- for plugin_class in plugins:
- if plugin_class.format_name() not in selected_formats:
- continue
- vnf = Vnf(heat_templates)
- generator = plugin_class(vnf, preload_dir, preload_env)
- generator.generate()
- if vnf and vnf.uses_contrail:
- print(
- "\nWARNING: Preload template generation does not support Contrail\n"
- "at this time, but Contrail resources were detected. The preload \n"
- "template may be incomplete."
- )
- if exitstatus != 0:
- print(
- "\nWARNING: Heat violations detected. Preload templates may be\n"
- "incomplete."
- )
+ for key, value in sequence.items():
+ for i in range(value["__count__"]):
+ yield (key, value)