[VVP] updating validation scripts in dublin
[vvp/validation-scripts.git] / ice_validator / tests / utils / nested_iterables.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
39 #
40
41
42 def parse_nested_dict(d, key=""):
43     """
44     parse the nested dictionary and return values of
45     given key of function parameter only
46     """
47     nested_elements = []
48     for k, v in d.items():
49         if isinstance(v, dict):
50             sub_dict = parse_nested_dict(v, key)
51             nested_elements.extend(sub_dict)
52         else:
53             if key:
54                 if k == key:
55                     nested_elements.append(v)
56             else:
57                 nested_elements.append(v)
58
59     return nested_elements
60
61
62 def find_all_get_param_in_yml(yml):
63     """
64     Recursively find all referenced parameters in a parsed yaml body
65     and return a list of parameters
66     """
67     os_pseudo_parameters = ["OS::stack_name", "OS::stack_id", "OS::project_id"]
68
69     if not hasattr(yml, "items"):
70         return []
71     params = []
72     for k, v in yml.items():
73         if k == "get_param" and v not in os_pseudo_parameters:
74             if isinstance(v, list) and not isinstance(v[0], dict):
75                 params.append(v[0])
76             elif not isinstance(v, dict) and isinstance(v, str):
77                 params.append(v)
78             for item in v if isinstance(v, list) else [v]:
79                 if isinstance(item, dict):
80                     params.extend(find_all_get_param_in_yml(item))
81             continue
82         elif k == "list_join":
83             for item in v if isinstance(v, list) else [v]:
84                 if isinstance(item, list):
85                     for d in item:
86                         params.extend(find_all_get_param_in_yml(d))
87             continue
88         if isinstance(v, dict):
89             params.extend(find_all_get_param_in_yml(v))
90         elif isinstance(v, list):
91             for d in v:
92                 params.extend(find_all_get_param_in_yml(d))
93     return params
94
95
96 def find_all_get_resource_in_yml(yml):
97     """
98     Recursively find all referenced resources
99     in a parsed yaml body and return a list of resource ids
100     """
101     if not hasattr(yml, "items"):
102         return []
103     resources = []
104     for k, v in yml.items():
105         if k == "get_resource":
106             if isinstance(v, list):
107                 resources.append(v[0])
108             else:
109                 resources.append(v)
110             continue
111         if isinstance(v, dict):
112             resources.extend(find_all_get_resource_in_yml(v))
113         elif isinstance(v, list):
114             for d in v:
115                 resources.extend(find_all_get_resource_in_yml(d))
116     return resources
117
118
119 def find_all_get_file_in_yml(yml):
120     """
121     Recursively find all get_file in a parsed yaml body
122     and return the list of referenced files/urls
123     """
124     if not hasattr(yml, "items"):
125         return []
126     resources = []
127     for k, v in yml.items():
128         if k == "get_file":
129             if isinstance(v, list):
130                 resources.append(v[0])
131             else:
132                 resources.append(v)
133             continue
134         if isinstance(v, dict):
135             resources.extend(find_all_get_file_in_yml(v))
136         elif isinstance(v, list):
137             for d in v:
138                 resources.extend(find_all_get_file_in_yml(d))
139     return resources
140
141
142 def find_all_get_resource_in_resource(resource):
143     """
144     Recursively find all referenced resources
145     in a heat resource and return a list of resource ids
146     """
147     if not hasattr(resource, "items"):
148         return []
149
150     resources = []
151     for k, v in resource.items():
152         if k == "get_resource":
153             if isinstance(v, list):
154                 resources.append(v[0])
155             else:
156                 resources.append(v)
157             continue
158         if isinstance(v, dict):
159             resources.extend(find_all_get_resource_in_resource(v))
160         elif isinstance(v, list):
161             for d in v:
162                 resources.extend(find_all_get_resource_in_resource(d))
163     return resources
164
165
166 def get_associated_resources_per_resource(resources):
167     """
168     Recursively find all referenced resources for each resource
169     in a list of resource ids
170     """
171     if not hasattr(resources, "items"):
172         return None
173
174     resources_dict = {}
175     resources_dict["resources"] = {}
176     ref_resources = []
177
178     for res_key, res_value in resources.items():
179         get_resources = []
180
181         for k, v in res_value:
182             if k == "get_resource" and isinstance(v, dict):
183                 get_resources = find_all_get_resource_in_resource(v)
184
185         # if resources found, add to dict
186         if get_resources:
187             ref_resources.extend(get_resources)
188             resources_dict["resources"][res_key] = {
189                 "res_value": res_value,
190                 "get_resources": get_resources,
191             }
192
193     resources_dict["ref_resources"] = set(ref_resources)
194
195     return resources_dict
196
197
198 def flatten(items):
199     """
200     flatten items from any nested iterable
201     """
202
203     merged_list = []
204     for item in items:
205         if isinstance(item, list):
206             sub_list = flatten(item)
207             merged_list.extend(sub_list)
208         else:
209             merged_list.append(item)
210     return merged_list