#
# ============LICENSE_END============================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
#
import re
import socket
+PARAM_FORMATS = [
+ ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
+ ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
+ ["network", "string", "external", re.compile(r"(.+?)_net_id")],
+ ["network", "string", "external", re.compile(r"(.+?)_net_name")],
+]
-def get_network_role_from_port(resource):
- """
- get the network role from a neutron port resource
- """
- if not isinstance(resource, dict):
- return None
- if "type" not in resource:
- return None
- if resource["type"] != "OS::Neutron::Port":
- return None
- if "properties" not in resource:
- return None
-
- formats = [
- ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
- ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
- ["network", "string", "external", re.compile(r"(.+?)_net_id")],
- ["network", "string", "external", re.compile(r"(.+?)_net_name")],
- ]
+RESOURCE_FORMATS = [
+ re.compile(r"int_(.+?)_network"), # OS::ContrailV2::VirtualNetwork
+ re.compile(r"int_(.+?)_RVN"), # OS::ContrailV2::VirtualNetwork
+ re.compile(r"int_(.+?)"), # OS::Neutron::Net
+]
- for k1, v1 in resource["properties"].items():
- if k1 != "network":
- continue
- # get the network id or name
- network = v1.get("get_param") or v1.get("get_resource")
- if not network:
- continue
+def get_network_role_and_type(resource):
+ """
+ Derive the network role and type (internal vs. external) from an
+ OS::Neutron::Port.
- for v2 in formats:
- m = v2[3].match(network)
+ :param resource: dict of Resource attributes
+ :return: tuple of (network_role, network_type) where network_type is
+ 'internal' or 'external'. Returns (None, None) if resource
+ is not a port or the values cannot be derived.
+ """
+ if not isinstance(resource, dict):
+ return None, None
+ if resource.get("type", "") != "OS::Neutron::Port":
+ return None, None
+
+ network_props = resource.get("properties", {}).get("network", {})
+ is_resource = "get_resource" in network_props
+ if is_resource:
+ network = network_props.get("get_resource", "")
+ else:
+ network = network_props.get("get_param", "")
+
+ if is_resource: # connecting to an network in the template
+ for format in RESOURCE_FORMATS:
+ m = format.match(network)
if m and m.group(1):
- return m.group(1)
-
- return None
+ return m.group(1), "internal"
+ else:
+ for format in PARAM_FORMATS:
+ m = format[3].match(network)
+ if m and m.group(1):
+ return m.group(1), format[2]
+ return None, None
-def get_network_type_from_port(resource):
+def get_network_role_from_port(resource):
"""
- get the network type from a neutron port resource
+ Get the network-role from a OS::Neutron::Port resource. Returns None
+ if resource is not a port or the network-role cannot be derived
"""
- if not isinstance(resource, dict):
- return None
- if "type" not in resource:
- return None
- if resource["type"] != "OS::Neutron::Port":
- return None
- if "properties" not in resource:
- return None
+ return get_network_role_and_type(resource)[0]
- formats = [
- ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
- ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
- ["network", "string", "external", re.compile(r"(.+?)_net_id")],
- ["network", "string", "external", re.compile(r"(.+?)_net_name")],
- ]
- for k1, v1 in resource["properties"].items():
- if k1 != "network":
- continue
- if "get_param" not in v1:
+def get_network_roles(resources, of_type=""):
+ """
+ Returns the network roles derived from the OS::Neutron::Port resources
+ in the collection of ``resources``. If ``of_type`` is not specified
+ then all network roles will be returned, or ``external`` or ``internal``
+ can be passed to select only those network roles
+
+ :param resources: collection of resource attributes (dict)
+ :param of_type: "internal" or "external"
+ :return: set of network roles discovered
+ """
+ valid_of_type = ("", "external", "internal")
+ if of_type not in ("", "external", "internal"):
+ raise RuntimeError("of_type must one of " + ", ".join(valid_of_type))
+ network_roles = set()
+ for v in resources.values():
+ nr, nt = get_network_role_and_type(v)
+ if not nr:
continue
- for v2 in formats:
- m = v2[3].match(v1["get_param"])
- if m and m.group(1):
- return v2[2]
+ if not of_type:
+ network_roles.add(nr)
+ elif of_type and of_type == nt:
+ network_roles.add(nr)
+ return network_roles
+
- return None
+def get_network_type_from_port(resource):
+ """
+ Get the network-type (internal or external) from an OS::Neutron::Port
+ resource. Returns None if the resource is not a port or the type
+ cannot be derived.
+ """
+ return get_network_role_and_type(resource)[1]
def is_valid_ip_address(ip_address, ip_type="ipv4"):
for k1, v1 in resource["properties"].items():
if k1 != property_name:
continue
- if "get_resource" in v1:
+ if isinstance(v1, dict) and "get_resource" in v1:
return True
return False