2e62b15fef98139682c9abde5e1f92bfaaa336c0
[dcaegen2/platform/plugins.git] / dcae-policy / dcaepolicyplugin / tasks.py
1 # ================================================================================
2 # Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
3 # Copyright (c) 2019 Pantheon.tech. All rights reserved.
4 # ================================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #      http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=========================================================
17 #
18 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
19
20 """tasks are the cloudify operations invoked on interfaces defined in the blueprint"""
21
22 import copy
23 import json
24 import traceback
25 import uuid
26
27 import requests
28 from cloudify import ctx
29 from cloudify.context import NODE_INSTANCE
30 from cloudify.decorators import operation
31 from cloudify.exceptions import NonRecoverableError
32
33 from .discovery import discover_service_url, discover_value
34
35 DCAE_POLICY_PLUGIN = "dcaepolicyplugin"
36 POLICY_ID = 'policy_id'
37 POLICY_REQUIRED = 'policy_required'
38 POLICY_BODY = 'policy_body'
39 POLICIES_FILTERED = 'policies_filtered'
40 POLICY_FILTER = 'policy_filter'
41 LATEST_POLICIES = "latest_policies"
42
43 REQUEST_ID = "requestID"
44
45 DCAE_POLICY_TYPE = 'dcae.nodes.policy'
46 DCAE_POLICIES_TYPE = 'dcae.nodes.policies'
47 DCAE_POLICY_TYPES = [DCAE_POLICY_TYPE, DCAE_POLICIES_TYPE]
48 CONFIG_ATTRIBUTES = "configAttributes"
49
50
51 class PolicyHandler(object):
52     """talk to policy-handler"""
53     SERVICE_NAME_POLICY_HANDLER = "policy_handler"
54     X_ECOMP_REQUESTID = 'X-ECOMP-RequestID'
55     STATUS_CODE_POLICIES_NOT_FOUND = 404
56     DEFAULT_URL = "http://policy-handler"
57     _url = None
58
59     @staticmethod
60     def _lazy_init():
61         """discover policy-handler"""
62         if PolicyHandler._url:
63             return
64
65         PolicyHandler._url = discover_service_url(PolicyHandler.SERVICE_NAME_POLICY_HANDLER)
66         if PolicyHandler._url:
67             return
68
69         config = discover_value(DCAE_POLICY_PLUGIN)
70         if config and isinstance(config, dict):
71             # expected structure for the config value for dcaepolicyplugin key
72             # {
73             #     "dcaepolicyplugin" : {
74             #         "policy_handler" : {
75             #             "target_entity" : "policy_handler",
76             #             "url" : "http://policy-handler:25577"
77             #         }
78             #     }
79             # }
80             PolicyHandler._url = config.get(DCAE_POLICY_PLUGIN, {}) \
81                 .get(PolicyHandler.SERVICE_NAME_POLICY_HANDLER, {}).get("url")
82
83         if PolicyHandler._url:
84             return
85
86         PolicyHandler._url = PolicyHandler.DEFAULT_URL
87
88     @staticmethod
89     def get_latest_policy(policy_id):
90         """retrieve the latest policy for policy_id from policy-handler"""
91         PolicyHandler._lazy_init()
92
93         ph_path = "{0}/policy_latest/{1}".format(PolicyHandler._url, policy_id)
94         headers = {PolicyHandler.X_ECOMP_REQUESTID: str(uuid.uuid4())}
95
96         ctx.logger.info("getting latest policy from {0} headers={1}".format(
97             ph_path, json.dumps(headers)))
98         res = requests.get(ph_path, headers=headers, timeout=60)
99         ctx.logger.info("latest policy for policy_id({0}) status({1}) response: {2}"
100                         .format(policy_id, res.status_code, res.text))
101
102         if res.status_code == PolicyHandler.STATUS_CODE_POLICIES_NOT_FOUND:
103             return
104
105         res.raise_for_status()
106         return res.json()
107
108     @staticmethod
109     def find_latest_policies(policy_filter):
110         """retrieve the latest policies by policy filter (selection criteria) from policy-handler"""
111         PolicyHandler._lazy_init()
112
113         ph_path = "{0}/policies_latest".format(PolicyHandler._url)
114         headers = {
115             PolicyHandler.X_ECOMP_REQUESTID: policy_filter.get(REQUEST_ID, str(uuid.uuid4()))
116         }
117
118         ctx.logger.info("finding the latest polices from {0} by {1} headers={2}".format(
119             ph_path, json.dumps(policy_filter), json.dumps(headers)))
120
121         res = requests.post(ph_path, json=policy_filter, headers=headers, timeout=60)
122         ctx.logger.info("latest policies status({0}) response: {1}"
123                         .format(res.status_code, res.text))
124
125         if res.status_code == PolicyHandler.STATUS_CODE_POLICIES_NOT_FOUND:
126             return
127
128         res.raise_for_status()
129         return res.json().get(LATEST_POLICIES)
130
131
132 def _policy_get():
133     """
134     dcae.nodes.policy -
135     retrieve the latest policy_body for policy_id property
136     and save policy_body in runtime_properties
137     """
138     if DCAE_POLICY_TYPE not in ctx.node.type_hierarchy:
139         return
140
141     policy_id = ctx.node.properties.get(POLICY_ID)
142     policy_required = ctx.node.properties.get(POLICY_REQUIRED)
143     if not policy_id:
144         error = "no {0} found in ctx.node.properties".format(POLICY_ID)
145         ctx.logger.error(error)
146         raise NonRecoverableError(error)
147
148     policy = None
149     try:
150         policy = PolicyHandler.get_latest_policy(policy_id)
151     except Exception as ex:
152         error = "failed to get policy({0}): {1}".format(policy_id, str(ex))
153         ctx.logger.error("{0}: {1}".format(error, traceback.format_exc()))
154         raise NonRecoverableError(error)
155
156     if not policy:
157         error = "policy not found for policy_id {0}".format(policy_id)
158         ctx.logger.info(error)
159         if policy_required:
160             raise NonRecoverableError(error)
161         return True
162
163     ctx.logger.info("found policy {0}: {1}".format(policy_id, json.dumps(policy)))
164     if POLICY_BODY in policy:
165         ctx.instance.runtime_properties[POLICY_BODY] = policy[POLICY_BODY]
166     return True
167
168
169 def _fix_policy_filter(policy_filter):
170     if CONFIG_ATTRIBUTES in policy_filter:
171         config_attributes = policy_filter.get(CONFIG_ATTRIBUTES)
172         if isinstance(config_attributes, dict):
173             return
174         try:
175             config_attributes = json.loads(config_attributes)
176             if config_attributes and isinstance(config_attributes, dict):
177                 policy_filter[CONFIG_ATTRIBUTES] = config_attributes
178                 return
179         except (ValueError, TypeError):
180             pass
181         if config_attributes:
182             ctx.logger.warn("unexpected %s: %s", CONFIG_ATTRIBUTES, config_attributes)
183         del policy_filter[CONFIG_ATTRIBUTES]
184
185
186 def _policies_find():
187     """
188     dcae.nodes.policies -
189     retrieve the latest policies for selection criteria
190     and save found policies in runtime_properties
191     """
192     if DCAE_POLICIES_TYPE not in ctx.node.type_hierarchy:
193         return
194
195     policy_required = ctx.node.properties.get(POLICY_REQUIRED)
196
197     try:
198         policy_filter = ctx.node.properties.get(POLICY_FILTER)
199         if policy_filter:
200             policy_filter = {
201                 k: copy.deepcopy(v) for k, v in policy_filter.items()
202                 if v or isinstance(v, (int, float))
203             }
204             _fix_policy_filter(policy_filter)
205         else:
206             policy_filter = {}
207
208         if REQUEST_ID not in policy_filter:
209             policy_filter[REQUEST_ID] = str(uuid.uuid4())
210
211         policies_filtered = PolicyHandler.find_latest_policies(policy_filter)
212
213         if not policies_filtered:
214             error = "policies not found by {0}".format(json.dumps(policy_filter))
215             ctx.logger.info(error)
216             if policy_required:
217                 raise NonRecoverableError(error)
218             return True
219
220         ctx.logger.info("found policies by {0}: {1}".format(
221             json.dumps(policy_filter), json.dumps(policies_filtered)
222         ))
223         ctx.instance.runtime_properties[POLICIES_FILTERED] = policies_filtered
224
225     except Exception as ex:
226         error = "failed to find policies: {0}".format(str(ex))
227         ctx.logger.error("{0}: {1}".format(error, traceback.format_exc()))
228         raise NonRecoverableError(error)
229
230     return True
231
232
233 #########################################################
234 @operation
235 def policy_get(**kwargs):
236     """retrieve the policy or policies and save it in runtime_properties"""
237     if ctx.type != NODE_INSTANCE:
238         raise NonRecoverableError("can only invoke policy_get on node of types: {0}"
239                                   .format(DCAE_POLICY_TYPES))
240
241     if not _policy_get() and not _policies_find():
242         error = "unexpected node type {0} for policy_get - expected types: {1}" \
243                 .format(ctx.node.type_hierarchy, DCAE_POLICY_TYPES)
244         ctx.logger.error(error)
245         raise NonRecoverableError(error)
246
247     ctx.logger.info("exit policy_get")