[VVP] add bug fixes and reserve port updates
[vvp/validation-scripts.git] / ice_validator / tests / utils / nested_iterables.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the “License”);
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
39 #
40
41
42 def parse_nested_dict(d, key=""):
43     '''
44     parse the nested dictionary and return values of
45     given key of function parameter only
46     '''
47     nested_elements = []
48     for k, v in d.items():
49         if isinstance(v, dict):
50             sub_dict = parse_nested_dict(v, key)
51             nested_elements.extend(sub_dict)
52         else:
53             if key:
54                 if k == key:
55                     nested_elements.append(v)
56             else:
57                 nested_elements.append(v)
58
59     return nested_elements
60
61
62 def find_all_get_param_in_yml(yml):
63     '''
64     Recursively find all referenced parameters in a parsed yaml body
65     and return a list of parameters
66     '''
67     os_pseudo_parameters = ['OS::stack_name',
68                             'OS::stack_id',
69                             'OS::project_id']
70
71     if not hasattr(yml, 'items'):
72         return []
73     params = []
74     for k, v in yml.items():
75         if k == 'get_param' and v not in os_pseudo_parameters:
76             if isinstance(v, list) and not isinstance(v[0], dict):
77                 params.append(v[0])
78             elif not isinstance(v, dict) and isinstance(v, str):
79                 params.append(v)
80             for item in (v if isinstance(v, list) else [v]):
81                 if isinstance(item, dict):
82                     params.extend(find_all_get_param_in_yml(item))
83             continue
84         elif k == 'list_join':
85             for item in (v if isinstance(v, list) else [v]):
86                 if isinstance(item, list):
87                     for d in item:
88                         params.extend(find_all_get_param_in_yml(d))
89             continue
90         if isinstance(v, dict):
91             params.extend(find_all_get_param_in_yml(v))
92         elif isinstance(v, list):
93             for d in v:
94                 params.extend(find_all_get_param_in_yml(d))
95     return params
96
97
98 def find_all_get_resource_in_yml(yml):
99     '''
100     Recursively find all referenced resources
101     in a parsed yaml body and return a list of resource ids
102     '''
103     if not hasattr(yml, 'items'):
104         return []
105     resources = []
106     for k, v in yml.items():
107         if k == 'get_resource':
108             if isinstance(v, list):
109                 resources.append(v[0])
110             else:
111                 resources.append(v)
112             continue
113         if isinstance(v, dict):
114             resources.extend(find_all_get_resource_in_yml(v))
115         elif isinstance(v, list):
116             for d in v:
117                 resources.extend(find_all_get_resource_in_yml(d))
118     return resources
119
120
121 def find_all_get_file_in_yml(yml):
122     '''
123     Recursively find all get_file in a parsed yaml body
124     and return the list of referenced files/urls
125     '''
126     if not hasattr(yml, 'items'):
127         return []
128     resources = []
129     for k, v in yml.items():
130         if k == 'get_file':
131             if isinstance(v, list):
132                 resources.append(v[0])
133             else:
134                 resources.append(v)
135             continue
136         if isinstance(v, dict):
137             resources.extend(find_all_get_file_in_yml(v))
138         elif isinstance(v, list):
139             for d in v:
140                 resources.extend(find_all_get_file_in_yml(d))
141     return resources
142
143
144 def find_all_get_resource_in_resource(resource):
145     '''
146     Recursively find all referenced resources
147     in a heat resource and return a list of resource ids
148     '''
149     if not hasattr(resource, 'items'):
150         return []
151
152     resources = []
153     for k, v in resource.items():
154         if k == 'get_resource':
155             if isinstance(v, list):
156                 resources.append(v[0])
157             else:
158                 resources.append(v)
159             continue
160         if isinstance(v, dict):
161             resources.extend(
162                 find_all_get_resource_in_resource(v))
163         elif isinstance(v, list):
164             for d in v:
165                 resources.extend(
166                     find_all_get_resource_in_resource(d))
167     return resources
168
169
170 def get_associated_resources_per_resource(resources):
171     '''
172     Recursively find all referenced resources for each resource
173     in a list of resource ids
174     '''
175     if not hasattr(resources, 'items'):
176         return None
177
178     resources_dict = {}
179     resources_dict["resources"] = {}
180     ref_resources = []
181
182     for res_key, res_value in resources.items():
183         get_resources = []
184
185         for k, v in res_value:
186             if k == 'get_resource' and\
187                isinstance(v, dict):
188                 get_resources = find_all_get_resource_in_resource(v)
189
190         # if resources found, add to dict
191         if get_resources:
192             ref_resources.extend(get_resources)
193             resources_dict["resources"][res_key] = {
194                 "res_value": res_value,
195                 "get_resources": get_resources,
196             }
197
198     resources_dict["ref_resources"] = set(ref_resources)
199
200     return resources_dict
201
202
203 def flatten(items):
204     '''
205     flatten items from any nested iterable
206     '''
207
208     merged_list = []
209     for item in items:
210         if isinstance(item, list):
211             sub_list = flatten(item)
212             merged_list.extend(sub_list)
213         else:
214             merged_list.append(item)
215     return merged_list