44ca2bc0efd0670fef227f466db3303e70c20512
[vvp/validation-scripts.git] / ice_validator / tests / utils / nested_iterables.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40
41 def is_pseudo_param(parameter):
42     pseudo_parameters = ["OS::stack_name", "OS::stack_id", "OS::project_id"]
43     return parameter in pseudo_parameters
44
45
46 def parse_nested_dict(d, key=""):
47     """
48     parse the nested dictionary and return values of
49     given key of function parameter only
50     """
51     nested_elements = []
52     for k, v in d.items():
53         if isinstance(v, dict):
54             sub_dict = parse_nested_dict(v, key)
55             nested_elements.extend(sub_dict)
56         else:
57             if key:
58                 if k == key:
59                     nested_elements.append(v)
60             else:
61                 nested_elements.append(v)
62
63     return nested_elements
64
65
66 def find_all_get_param_in_yml(yml):
67     """
68     Recursively find all referenced parameters in a parsed yaml body
69     and return a list of parameters
70     """
71
72     if not hasattr(yml, "items"):
73         return []
74     params = []
75     for k, v in yml.items():
76         if k == "get_param" and not is_pseudo_param(v):
77             if isinstance(v, list) and not isinstance(v[0], dict):
78                 params.append(v[0])
79             elif not isinstance(v, dict) and isinstance(v, str):
80                 params.append(v)
81             for item in v if isinstance(v, list) else [v]:
82                 if isinstance(item, dict):
83                     params.extend(find_all_get_param_in_yml(item))
84             continue
85         elif k == "list_join":
86             for item in v if isinstance(v, list) else [v]:
87                 if isinstance(item, list):
88                     for d in item:
89                         params.extend(find_all_get_param_in_yml(d))
90             continue
91         if isinstance(v, dict):
92             params.extend(find_all_get_param_in_yml(v))
93         elif isinstance(v, list):
94             for d in v:
95                 params.extend(find_all_get_param_in_yml(d))
96     return params
97
98
99 def find_all_get_resource_in_yml(yml):
100     """
101     Recursively find all referenced resources
102     in a parsed yaml body and return a list of resource ids
103     """
104     if not hasattr(yml, "items"):
105         return []
106     resources = []
107     for k, v in yml.items():
108         if k == "get_resource":
109             if isinstance(v, list):
110                 resources.append(v[0])
111             else:
112                 resources.append(v)
113             continue
114         if isinstance(v, dict):
115             resources.extend(find_all_get_resource_in_yml(v))
116         elif isinstance(v, list):
117             for d in v:
118                 resources.extend(find_all_get_resource_in_yml(d))
119     return resources
120
121
122 def find_all_get_file_in_yml(yml):
123     """
124     Recursively find all get_file in a parsed yaml body
125     and return the list of referenced files/urls
126     """
127     if not hasattr(yml, "items"):
128         return []
129     resources = []
130     for k, v in yml.items():
131         if k == "get_file":
132             if isinstance(v, list):
133                 resources.append(v[0])
134             else:
135                 resources.append(v)
136             continue
137         if isinstance(v, dict):
138             resources.extend(find_all_get_file_in_yml(v))
139         elif isinstance(v, list):
140             for d in v:
141                 resources.extend(find_all_get_file_in_yml(d))
142     return resources
143
144
145 def find_all_get_resource_in_resource(resource):
146     """
147     Recursively find all referenced resources
148     in a heat resource and return a list of resource ids
149     """
150     if not hasattr(resource, "items"):
151         return []
152
153     resources = []
154     for k, v in resource.items():
155         if k == "get_resource":
156             if isinstance(v, list):
157                 resources.append(v[0])
158             else:
159                 resources.append(v)
160             continue
161         if isinstance(v, dict):
162             resources.extend(find_all_get_resource_in_resource(v))
163         elif isinstance(v, list):
164             for d in v:
165                 resources.extend(find_all_get_resource_in_resource(d))
166     return resources
167
168
169 def get_associated_resources_per_resource(resources):
170     """
171     Recursively find all referenced resources for each resource
172     in a list of resource ids
173     """
174     if not hasattr(resources, "items"):
175         return None
176
177     resources_dict = {}
178     resources_dict["resources"] = {}
179     ref_resources = []
180
181     for res_key, res_value in resources.items():
182         get_resources = []
183
184         for k, v in res_value:
185             if k == "get_resource" and isinstance(v, dict):
186                 get_resources = find_all_get_resource_in_resource(v)
187
188         # if resources found, add to dict
189         if get_resources:
190             ref_resources.extend(get_resources)
191             resources_dict["resources"][res_key] = {
192                 "res_value": res_value,
193                 "get_resources": get_resources,
194             }
195
196     resources_dict["ref_resources"] = set(ref_resources)
197
198     return resources_dict
199
200
201 def flatten(items):
202     """
203     flatten items from any nested iterable
204     """
205
206     merged_list = []
207     for item in items:
208         if isinstance(item, list):
209             sub_list = flatten(item)
210             merged_list.extend(sub_list)
211         else:
212             merged_list.append(item)
213     return merged_list