[VVP] Fixed issue in unused parameter detection
[vvp/validation-scripts.git] / ice_validator / tests / utils / nested_iterables.py
index 44ca2bc..a868e5a 100644 (file)
@@ -36,6 +36,7 @@
 # ============LICENSE_END============================================
 #
 #
+from tests.helpers import traverse
 
 
 def is_pseudo_param(parameter):
@@ -43,24 +44,15 @@ def is_pseudo_param(parameter):
     return parameter in pseudo_parameters
 
 
-def parse_nested_dict(d, key=""):
-    """
-    parse the nested dictionary and return values of
-    given key of function parameter only
-    """
-    nested_elements = []
-    for k, v in d.items():
-        if isinstance(v, dict):
-            sub_dict = parse_nested_dict(v, key)
-            nested_elements.extend(sub_dict)
-        else:
-            if key:
-                if k == key:
-                    nested_elements.append(v)
-            else:
-                nested_elements.append(v)
+class ParameterCollector:
+    def __init__(self):
+        self.params = set()
 
-    return nested_elements
+    def __call__(self, _, value):
+        if isinstance(value, str):
+            self.params.add(value)
+        elif isinstance(value, list) and len(value) >= 1:
+            self.params.add(value[0])
 
 
 def find_all_get_param_in_yml(yml):
@@ -68,32 +60,9 @@ def find_all_get_param_in_yml(yml):
     Recursively find all referenced parameters in a parsed yaml body
     and return a list of parameters
     """
-
-    if not hasattr(yml, "items"):
-        return []
-    params = []
-    for k, v in yml.items():
-        if k == "get_param" and not is_pseudo_param(v):
-            if isinstance(v, list) and not isinstance(v[0], dict):
-                params.append(v[0])
-            elif not isinstance(v, dict) and isinstance(v, str):
-                params.append(v)
-            for item in v if isinstance(v, list) else [v]:
-                if isinstance(item, dict):
-                    params.extend(find_all_get_param_in_yml(item))
-            continue
-        elif k == "list_join":
-            for item in v if isinstance(v, list) else [v]:
-                if isinstance(item, list):
-                    for d in item:
-                        params.extend(find_all_get_param_in_yml(d))
-            continue
-        if isinstance(v, dict):
-            params.extend(find_all_get_param_in_yml(v))
-        elif isinstance(v, list):
-            for d in v:
-                params.extend(find_all_get_param_in_yml(d))
-    return params
+    collector = ParameterCollector()
+    traverse(yml, "get_param", collector)
+    return {p for p in collector.params if not is_pseudo_param(p)}
 
 
 def find_all_get_resource_in_yml(yml):
@@ -101,22 +70,9 @@ def find_all_get_resource_in_yml(yml):
     Recursively find all referenced resources
     in a parsed yaml body and return a list of resource ids
     """
-    if not hasattr(yml, "items"):
-        return []
-    resources = []
-    for k, v in yml.items():
-        if k == "get_resource":
-            if isinstance(v, list):
-                resources.append(v[0])
-            else:
-                resources.append(v)
-            continue
-        if isinstance(v, dict):
-            resources.extend(find_all_get_resource_in_yml(v))
-        elif isinstance(v, list):
-            for d in v:
-                resources.extend(find_all_get_resource_in_yml(d))
-    return resources
+    collector = ParameterCollector()
+    traverse(yml, "get_resource", collector)
+    return collector.params
 
 
 def find_all_get_file_in_yml(yml):
@@ -124,90 +80,6 @@ def find_all_get_file_in_yml(yml):
     Recursively find all get_file in a parsed yaml body
     and return the list of referenced files/urls
     """
-    if not hasattr(yml, "items"):
-        return []
-    resources = []
-    for k, v in yml.items():
-        if k == "get_file":
-            if isinstance(v, list):
-                resources.append(v[0])
-            else:
-                resources.append(v)
-            continue
-        if isinstance(v, dict):
-            resources.extend(find_all_get_file_in_yml(v))
-        elif isinstance(v, list):
-            for d in v:
-                resources.extend(find_all_get_file_in_yml(d))
-    return resources
-
-
-def find_all_get_resource_in_resource(resource):
-    """
-    Recursively find all referenced resources
-    in a heat resource and return a list of resource ids
-    """
-    if not hasattr(resource, "items"):
-        return []
-
-    resources = []
-    for k, v in resource.items():
-        if k == "get_resource":
-            if isinstance(v, list):
-                resources.append(v[0])
-            else:
-                resources.append(v)
-            continue
-        if isinstance(v, dict):
-            resources.extend(find_all_get_resource_in_resource(v))
-        elif isinstance(v, list):
-            for d in v:
-                resources.extend(find_all_get_resource_in_resource(d))
-    return resources
-
-
-def get_associated_resources_per_resource(resources):
-    """
-    Recursively find all referenced resources for each resource
-    in a list of resource ids
-    """
-    if not hasattr(resources, "items"):
-        return None
-
-    resources_dict = {}
-    resources_dict["resources"] = {}
-    ref_resources = []
-
-    for res_key, res_value in resources.items():
-        get_resources = []
-
-        for k, v in res_value:
-            if k == "get_resource" and isinstance(v, dict):
-                get_resources = find_all_get_resource_in_resource(v)
-
-        # if resources found, add to dict
-        if get_resources:
-            ref_resources.extend(get_resources)
-            resources_dict["resources"][res_key] = {
-                "res_value": res_value,
-                "get_resources": get_resources,
-            }
-
-    resources_dict["ref_resources"] = set(ref_resources)
-
-    return resources_dict
-
-
-def flatten(items):
-    """
-    flatten items from any nested iterable
-    """
-
-    merged_list = []
-    for item in items:
-        if isinstance(item, list):
-            sub_list = flatten(item)
-            merged_list.extend(sub_list)
-        else:
-            merged_list.append(item)
-    return merged_list
+    collector = ParameterCollector()
+    traverse(yml, "get_file", collector)
+    return collector.params