[VVP] ports.py check port type
[vvp/validation-scripts.git] / ice_validator / tests / utils / ports.py
index 4d0b4ca..89440eb 100644 (file)
@@ -6,7 +6,7 @@
 # ===================================================================
 #
 # Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
+# under the Apache License, Version 2.0 (the "License");
 # you may not use this software except in compliance with the License.
 # You may obtain a copy of the License at
 #
@@ -21,7 +21,7 @@
 #
 #
 # Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
 # you may not use this documentation except in compliance with the License.
 # You may obtain a copy of the License at
 #
 #
 # ============LICENSE_END============================================
 #
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
 #
 
-from .network_roles import get_network_role_from_port
+from .network_roles import get_network_role_and_type
 from .vm_types import get_vm_type_for_nova_server
 import re
 
 
-def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
-    '''
+def is_valid_ip_address(
+    ip_address, vm_type, network_role, port_property, parameter_type, network_type
+):
+    """
     Check the ip_address to make sure it is properly formatted and
     also contains {vm_type} and {network_role}
-    '''
+    """
 
     allowed_formats = [
-                      ["allowed_address_pairs", "string", "internal",
-                       re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
-                      ["allowed_address_pairs", "string", "internal",
-                       re.compile(r'(.+?)_int_(.+?)_floating_ip')],
-                      ["allowed_address_pairs", "string", "external",
-                       re.compile(r'(.+?)_floating_v6_ip')],
-                      ["allowed_address_pairs", "string", "external",
-                       re.compile(r'(.+?)_floating_ip')],
-                      ["allowed_address_pairs", "string", "internal",
-                       re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
-                      ["allowed_address_pairs", "string", "internal",
-                       re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
-                      ["allowed_address_pairs", "string", "external",
-                       re.compile(r'(.+?)_v6_ip_\d+')],
-                      ["allowed_address_pairs", "string", "external",
-                       re.compile(r'(.+?)_ip_\d+')],
-                      ["allowed_address_pairs", "comma_delimited_list",
-                       "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
-                      ["allowed_address_pairs", "comma_delimited_list",
-                       "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
-                      ["allowed_address_pairs", "comma_delimited_list",
-                       "external", re.compile(r'(.+?)_v6_ips')],
-                      ["allowed_address_pairs", "comma_delimited_list",
-                       "external", re.compile(r'(.+?)_ips')],
-                      ["fixed_ips", "string", "internal",
-                       re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
-                      ["fixed_ips", "string", "internal",
-                       re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
-                      ["fixed_ips", "string", "external",
-                       re.compile(r'(.+?)_v6_ip_\d+')],
-                      ["fixed_ips", "string", "external",
-                       re.compile(r'(.+?)_ip_\d+')],
-                      ["fixed_ips", "comma_delimited_list", "internal",
-                       re.compile(r'(.+?)_int_(.+?)_v6_ips')],
-                      ["fixed_ips", "comma_delimited_list", "internal",
-                       re.compile(r'(.+?)_int_(.+?)_ips')],
-                      ["fixed_ips", "comma_delimited_list", "external",
-                       re.compile(r'(.+?)_v6_ips')],
-                      ["fixed_ips", "comma_delimited_list", "external",
-                       re.compile(r'(.+?)_ips')],
-                      ]
+        [
+            "allowed_address_pairs",
+            "string",
+            "internal",
+            re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
+        ],
+        [
+            "allowed_address_pairs",
+            "string",
+            "internal",
+            re.compile(r"(.+?)_int_(.+?)_floating_ip"),
+        ],
+        [
+            "allowed_address_pairs",
+            "string",
+            "external",
+            re.compile(r"(.+?)_floating_v6_ip"),
+        ],
+        [
+            "allowed_address_pairs",
+            "string",
+            "external",
+            re.compile(r"(.+?)_floating_ip"),
+        ],
+        [
+            "allowed_address_pairs",
+            "string",
+            "internal",
+            re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
+        ],
+        [
+            "allowed_address_pairs",
+            "string",
+            "internal",
+            re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
+        ],
+        ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
+        ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
+        [
+            "allowed_address_pairs",
+            "comma_delimited_list",
+            "internal",
+            re.compile(r"(.+?)_int_(.+?)_v6_ips"),
+        ],
+        [
+            "allowed_address_pairs",
+            "comma_delimited_list",
+            "internal",
+            re.compile(r"(.+?)_int_(.+?)_ips"),
+        ],
+        [
+            "allowed_address_pairs",
+            "comma_delimited_list",
+            "external",
+            re.compile(r"(.+?)_v6_ips"),
+        ],
+        [
+            "allowed_address_pairs",
+            "comma_delimited_list",
+            "external",
+            re.compile(r"(.+?)_ips"),
+        ],
+        ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
+        ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
+        ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
+        ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
+        [
+            "fixed_ips",
+            "comma_delimited_list",
+            "internal",
+            re.compile(r"(.+?)_int_(.+?)_v6_ips"),
+        ],
+        [
+            "fixed_ips",
+            "comma_delimited_list",
+            "internal",
+            re.compile(r"(.+?)_int_(.+?)_ips"),
+        ],
+        ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
+        ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
+    ]
 
     for v3 in allowed_formats:
+        if v3[1] != parameter_type:
+            continue
         if v3[0] != port_property:
             continue
+        if v3[2] != network_type:
+            continue
         # check if pattern matches
         m = v3[3].match(ip_address)
         if m:
-            if (v3[2] == "internal" and
-                    len(m.groups()) > 1):
-                    return m.group(1) == vm_type and\
-                        m.group(2) == network_role
-            elif (v3[2] == "external" and
-                  len(m.groups()) > 0):
+            if v3[2] == "internal" and len(m.groups()) > 1:
+                return m.group(1) == vm_type and m.group(2) == network_role
+            elif v3[2] == "external" and len(m.groups()) > 0:
                 return m.group(1) == vm_type + "_" + network_role
 
     return False
 
 
-def get_invalid_ip_addresses(resources, port_property):
-    '''
+def get_invalid_ip_addresses(resources, port_property, parameters):
+    """
     Get a list of valid ip addresses for a heat resources section
-    '''
+    """
     invalid_ip_addresses = []
 
     for k, v in resources.items():
         if not isinstance(v, dict):
             continue
-        if 'type' not in v:
+        if "type" not in v:
             continue
-        if v['type'] not in 'OS::Nova::Server':
+        if v["type"] not in "OS::Nova::Server":
             continue
-        if 'properties' not in v:
+        if "properties" not in v:
             continue
-        if 'networks' not in v['properties']:
+        if "networks" not in v["properties"]:
             continue
 
         port_resource = None
@@ -134,24 +176,24 @@ def get_invalid_ip_addresses(resources, port_property):
             continue
 
         # get all ports associated with the nova server
-        properties = v['properties']
-        for network in properties['networks']:
+        properties = v["properties"]
+        for network in properties["networks"]:
             for k3, v3 in network.items():
-                if k3 != 'port':
+                if k3 != "port":
                     continue
                 if not isinstance(v3, dict):
                     continue
 
-                if 'get_resource' in v3:
-                    port_id = v3['get_resource']
+                if "get_resource" in v3:
+                    port_id = v3["get_resource"]
                     if not resources[port_id]:
                         continue
                     port_resource = resources[port_id]
                 else:
                     continue
 
-                network_role = get_network_role_from_port(port_resource)
-                if not network_role:
+                network_role, network_type = get_network_role_and_type(port_resource)
+                if not network_role or not network_type:
                     continue
 
                 for k1, v1 in port_resource["properties"].items():
@@ -162,16 +204,26 @@ def get_invalid_ip_addresses(resources, port_property):
                             continue
                         if "get_param" not in v2["ip_address"]:
                             continue
-
                         ip_address = v2["ip_address"]["get_param"]
 
                         if isinstance(ip_address, list):
                             ip_address = ip_address[0]
 
-                        valid_ip_address = is_valid_ip_address(ip_address,
-                                                               vm_type,
-                                                               network_role,
-                                                               port_property)
+                        if ip_address not in parameters:
+                            continue
+
+                        parameter_type = parameters[ip_address].get("type")
+                        if not parameter_type:
+                            continue
+
+                        valid_ip_address = is_valid_ip_address(
+                            ip_address,
+                            vm_type,
+                            network_role,
+                            port_property,
+                            parameter_type,
+                            network_type,
+                        )
 
                         if not valid_ip_address:
                             invalid_ip_addresses.append(ip_address)
@@ -179,19 +231,17 @@ def get_invalid_ip_addresses(resources, port_property):
     return invalid_ip_addresses
 
 
-def is_reserved_port(port_id):
-    '''
-    Checks to see if the resource id for a port follows
-    the reserve port concept
-    '''
-    formats = [
-            ["port_id",
-             re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
-            ["port_id",
-             re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')],
-            ]
-    for f in formats:
-        m = f[1].match(port_id.lower())
-        if m and m.group(1):
-            return True
-    return False
+def get_list_of_ports_attached_to_nova_server(nova_server):
+    networks_list = nova_server.get("properties", {}).get("networks")
+
+    port_ids = []
+    if networks_list:
+        for network in networks_list:
+            network_prop = network.get("port")
+            if network_prop:
+                pid = network_prop.get("get_param")
+                if not pid:
+                    pid = network_prop.get("get_resource")
+                port_ids.append(pid)
+
+    return port_ids