[VVP] updating validation scripts in dublin
[vvp/validation-scripts.git] / ice_validator / tests / utils / network_roles.py
index bed3a5a..ffb9870 100644 (file)
 import re
 import socket
 
-
-def get_network_role_from_port(resource):
-    '''
-    get the network role from a neutron port resource
-    '''
+PARAM_FORMATS = [
+    ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
+    ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
+    ["network", "string", "external", re.compile(r"(.+?)_net_id")],
+    ["network", "string", "external", re.compile(r"(.+?)_net_name")],
+]
+
+RESOURCE_FORMATS = [
+    re.compile(r"int_(.+?)_network"),  # OS::ContrailV2::VirtualNetwork
+    re.compile(r"int_(.+?)_RVN"),  # OS::ContrailV2::VirtualNetwork
+    re.compile(r"int_(.+?)"),  # OS::Neutron::Net
+]
+
+
+def get_network_role_and_type(resource):
+    """
+    Derive the network role and type (internal vs. external) from an
+    OS::Neutron::Port.
+
+    :param resource: dict of Resource attributes
+    :return: tuple of (network_role, network_type) where network_type is
+             'internal' or 'external'.  Returns (None, None) if resource
+             is not a port or the values cannot be derived.
+    """
     if not isinstance(resource, dict):
-        return None
-    if 'type' not in resource:
-        return None
-    if resource['type'] != 'OS::Neutron::Port':
-        return None
-    if 'properties' not in resource:
-        return None
-
-    formats = [
-              ["network", "string", "internal",
-               re.compile(r'int_(.+?)_net_id')],
-              ["network", "string", "internal",
-               re.compile(r'int_(.+?)_net_name')],
-              ["network", "string", "external",
-               re.compile(r'(.+?)_net_id')],
-              ["network", "string", "external",
-               re.compile(r'(.+?)_net_name')]]
-
-    for k1, v1 in resource["properties"].items():
-        if k1 != 'network':
-            continue
-
-        # get the network id or name
-        network = (
-            v1.get('get_param') or
-            v1.get('get_resource'))
-        if not network:
-            continue
-
-        for v2 in formats:
-            m = v2[3].match(network)
+        return None, None
+    if resource.get("type", "") != "OS::Neutron::Port":
+        return None, None
+
+    network_props = resource.get("properties", {}).get("network", {})
+    is_resource = "get_resource" in network_props
+    if is_resource:
+        network = network_props.get("get_resource", "")
+    else:
+        network = network_props.get("get_param", "")
+
+    if is_resource:  # connecting to an network in the template
+        for format in RESOURCE_FORMATS:
+            m = format.match(network)
             if m and m.group(1):
-                return m.group(1)
-
-    return None
+                return m.group(1), "internal"
+    else:
+        for format in PARAM_FORMATS:
+            m = format[3].match(network)
+            if m and m.group(1):
+                return m.group(1), format[2]
+    return None, None
 
 
-def get_network_roles(resources):
-    network_roles = []
+def get_network_role_from_port(resource):
+    """
+    Get the network-role from a OS::Neutron::Port resource.  Returns None
+    if resource is not a port or the network-role cannot be derived
+    """
+    return get_network_role_and_type(resource)[0]
+
+
+def get_network_roles(resources, of_type=""):
+    """
+    Returns the network roles derived from the OS::Neutron::Port resources
+    in the collection of ``resources``.  If ``of_type`` is not specified
+    then all network roles will be returned, or ``external`` or ``internal``
+    can be passed to select only those network roles
+
+    :param resources:   collection of resource attributes (dict)
+    :param of_type:     "internal" or "external"
+    :return:            set of network roles discovered
+    """
+    valid_of_type = ("", "external", "internal")
+    if of_type not in ("", "external", "internal"):
+        raise RuntimeError("of_type must one of " + ", ".join(valid_of_type))
+    network_roles = set()
     for v in resources.values():
-        nr = get_network_role_from_port(v)
-        if nr:
-            network_roles.append(nr)
-
-    return set(network_roles)
+        nr, nt = get_network_role_and_type(v)
+        if not nr:
+            continue
+        if not of_type:
+            network_roles.add(nr)
+        elif of_type and of_type == nt:
+            network_roles.add(nr)
+    return network_roles
 
 
 def get_network_type_from_port(resource):
-    '''
-    get the network type from a neutron port resource
-    '''
-    if not isinstance(resource, dict):
-        return None
-    if 'type' not in resource:
-        return None
-    if resource['type'] != 'OS::Neutron::Port':
-        return None
-    if 'properties' not in resource:
-        return None
-
-    formats = [
-              ["network", "string", "internal",
-               re.compile(r'int_(.+?)_net_id')],
-              ["network", "string", "internal",
-               re.compile(r'int_(.+?)_net_name')],
-              ["network", "string", "external",
-               re.compile(r'(.+?)_net_id')],
-              ["network", "string", "external",
-               re.compile(r'(.+?)_net_name')]]
-
-    for k1, v1 in resource["properties"].items():
-        if k1 != 'network':
-            continue
-        if "get_param" not in v1:
-            continue
-        for v2 in formats:
-            m = v2[3].match(v1["get_param"])
-            if m and m.group(1):
-                return v2[2]
-
-    return None
+    """
+    Get the network-type (internal or external) from an OS::Neutron::Port
+    resource.  Returns None if the resource is not a port or the type
+    cannot be derived.
+    """
+    return get_network_role_and_type(resource)[1]
 
 
-def is_valid_ip_address(ip_address, ip_type='ipv4'):
-    '''
+def is_valid_ip_address(ip_address, ip_type="ipv4"):
+    """
     check if an ip address is valid
-    '''
-    if ip_type == 'ipv4':
+    """
+    if ip_type == "ipv4":
         return is_valid_ipv4_address(ip_address)
-    elif ip_type == 'ipv6':
+    elif ip_type == "ipv6":
         return is_valid_ipv6_address(ip_address)
     return False
 
 
 def is_valid_ipv4_address(ip_address):
-    '''
+    """
     check if an ip address of the type ipv4
     is valid
-    '''
+    """
     try:
         socket.inet_pton(socket.AF_INET, ip_address)
     except AttributeError:
@@ -153,17 +156,17 @@ def is_valid_ipv4_address(ip_address):
             socket.inet_aton(ip_address)
         except (OSError, socket.error):
             return False
-        return ip_address.count('.') == 3
+        return ip_address.count(".") == 3
     except (OSError, socket.error):
         return False
     return True
 
 
 def is_valid_ipv6_address(ip_address):
-    '''
+    """
     check if an ip address of the type ipv6
     is valid
-    '''
+    """
     try:
         socket.inet_pton(socket.AF_INET6, ip_address)
     except (OSError, socket.error):
@@ -172,17 +175,17 @@ def is_valid_ipv6_address(ip_address):
 
 
 def property_uses_get_resource(resource, property_name):
-    '''
+    """
     returns true if a port's network property
     uses the get_resource function
-    '''
+    """
     if not isinstance(resource, dict):
         return False
-    if 'properties' not in resource:
+    if "properties" not in resource:
         return False
     for k1, v1 in resource["properties"].items():
         if k1 != property_name:
             continue
-        if "get_resource" in v1:
+        if isinstance(v1, dict) and "get_resource" in v1:
             return True
     return False