DCAEGEN2-1920 enhance policy plugin for error conditions
[dcaegen2/platform/plugins.git] / dcae-policy / dcaepolicyplugin / tasks.py
1 # ================================================================================
2 # Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 #
17 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
18
19 """tasks are the cloudify operations invoked on interfaces defined in the blueprint"""
20
21 import copy
22 import json
23 import traceback
24 import uuid
25
26 import requests
27 from cloudify import ctx
28 from cloudify.context import NODE_INSTANCE
29 from cloudify.decorators import operation
30 from cloudify.exceptions import NonRecoverableError
31
32 from .discovery import discover_service_url, discover_value
33
34 DCAE_POLICY_PLUGIN = "dcaepolicyplugin"
35 POLICY_ID = 'policy_id'
36 POLICY_REQUIRED = 'policy_required'
37 POLICY_BODY = 'policy_body'
38 POLICIES_FILTERED = 'policies_filtered'
39 POLICY_FILTER = 'policy_filter'
40 LATEST_POLICIES = "latest_policies"
41
42 REQUEST_ID = "requestID"
43
44 DCAE_POLICY_TYPE = 'dcae.nodes.policy'
45 DCAE_POLICIES_TYPE = 'dcae.nodes.policies'
46 DCAE_POLICY_TYPES = [DCAE_POLICY_TYPE, DCAE_POLICIES_TYPE]
47 CONFIG_ATTRIBUTES = "configAttributes"
48
49
50 class PolicyHandler(object):
51     """talk to policy-handler"""
52     SERVICE_NAME_POLICY_HANDLER = "policy_handler"
53     X_ECOMP_REQUESTID = 'X-ECOMP-RequestID'
54     STATUS_CODE_POLICIES_NOT_FOUND = 404
55     DEFAULT_URL = "http://policy-handler"
56     _url = None
57
58     @staticmethod
59     def _lazy_init():
60         """discover policy-handler"""
61         if PolicyHandler._url:
62             return
63
64         PolicyHandler._url = discover_service_url(PolicyHandler.SERVICE_NAME_POLICY_HANDLER)
65         if PolicyHandler._url:
66             return
67
68         config = discover_value(DCAE_POLICY_PLUGIN)
69         if config and isinstance(config, dict):
70             # expected structure for the config value for dcaepolicyplugin key
71             # {
72             #     "dcaepolicyplugin" : {
73             #         "policy_handler" : {
74             #             "target_entity" : "policy_handler",
75             #             "url" : "http://policy-handler:25577"
76             #         }
77             #     }
78             # }
79             PolicyHandler._url = config.get(DCAE_POLICY_PLUGIN, {}) \
80                 .get(PolicyHandler.SERVICE_NAME_POLICY_HANDLER, {}).get("url")
81
82         if PolicyHandler._url:
83             return
84
85         PolicyHandler._url = PolicyHandler.DEFAULT_URL
86
87     @staticmethod
88     def get_latest_policy(policy_id):
89         """retrieve the latest policy for policy_id from policy-handler"""
90         PolicyHandler._lazy_init()
91
92         ph_path = "{0}/policy_latest/{1}".format(PolicyHandler._url, policy_id)
93         headers = {PolicyHandler.X_ECOMP_REQUESTID: str(uuid.uuid4())}
94
95         ctx.logger.info("getting latest policy from {0} headers={1}".format(
96             ph_path, json.dumps(headers)))
97         res = requests.get(ph_path, headers=headers, timeout=60)
98         ctx.logger.info("latest policy for policy_id({0}) status({1}) response: {2}"
99                         .format(policy_id, res.status_code, res.text))
100
101         if res.status_code == PolicyHandler.STATUS_CODE_POLICIES_NOT_FOUND:
102             return
103
104         res.raise_for_status()
105         return res.json()
106
107     @staticmethod
108     def find_latest_policies(policy_filter):
109         """retrieve the latest policies by policy filter (selection criteria) from policy-handler"""
110         PolicyHandler._lazy_init()
111
112         ph_path = "{0}/policies_latest".format(PolicyHandler._url)
113         headers = {
114             PolicyHandler.X_ECOMP_REQUESTID: policy_filter.get(REQUEST_ID, str(uuid.uuid4()))
115         }
116
117         ctx.logger.info("finding the latest polices from {0} by {1} headers={2}".format(
118             ph_path, json.dumps(policy_filter), json.dumps(headers)))
119
120         res = requests.post(ph_path, json=policy_filter, headers=headers, timeout=60)
121         ctx.logger.info("latest policies status({0}) response: {1}"
122                         .format(res.status_code, res.text))
123
124         if res.status_code == PolicyHandler.STATUS_CODE_POLICIES_NOT_FOUND:
125             return
126
127         res.raise_for_status()
128         return res.json().get(LATEST_POLICIES)
129
130
131 def _policy_get():
132     """
133     dcae.nodes.policy -
134     retrieve the latest policy_body for policy_id property
135     and save policy_body in runtime_properties
136     """
137     if DCAE_POLICY_TYPE not in ctx.node.type_hierarchy:
138         return
139
140     policy_id = ctx.node.properties.get(POLICY_ID)
141     policy_required = ctx.node.properties.get(POLICY_REQUIRED)
142     if not policy_id:
143         error = "no {0} found in ctx.node.properties".format(POLICY_ID)
144         ctx.logger.error(error)
145         raise NonRecoverableError(error)
146
147     policy = None
148     try:
149         policy = PolicyHandler.get_latest_policy(policy_id)
150     except Exception as ex:
151         error = "failed to get policy({0}): {1}".format(policy_id, str(ex))
152         ctx.logger.error("{0}: {1}".format(error, traceback.format_exc()))
153         raise NonRecoverableError(error)
154
155     if not policy:
156         error = "policy not found for policy_id {0}".format(policy_id)
157         ctx.logger.info(error)
158         if policy_required:
159             raise NonRecoverableError(error)
160         return True
161
162     ctx.logger.info("found policy {0}: {1}".format(policy_id, json.dumps(policy)))
163     if POLICY_BODY in policy:
164         ctx.instance.runtime_properties[POLICY_BODY] = policy[POLICY_BODY]
165     return True
166
167
168 def _fix_policy_filter(policy_filter):
169     if CONFIG_ATTRIBUTES in policy_filter:
170         config_attributes = policy_filter.get(CONFIG_ATTRIBUTES)
171         if isinstance(config_attributes, dict):
172             return
173         try:
174             config_attributes = json.loads(config_attributes)
175             if config_attributes and isinstance(config_attributes, dict):
176                 policy_filter[CONFIG_ATTRIBUTES] = config_attributes
177                 return
178         except (ValueError, TypeError):
179             pass
180         if config_attributes:
181             ctx.logger.warn("unexpected %s: %s", CONFIG_ATTRIBUTES, config_attributes)
182         del policy_filter[CONFIG_ATTRIBUTES]
183
184
185 def _policies_find():
186     """
187     dcae.nodes.policies -
188     retrieve the latest policies for selection criteria
189     and save found policies in runtime_properties
190     """
191     if DCAE_POLICIES_TYPE not in ctx.node.type_hierarchy:
192         return
193
194     policy_required = ctx.node.properties.get(POLICY_REQUIRED)
195
196     try:
197         policy_filter = copy.deepcopy(dict(
198             (k, v) for (k, v) in dict(ctx.node.properties.get(POLICY_FILTER, {})).iteritems()
199             if v or isinstance(v, (int, float))
200         ))
201         _fix_policy_filter(policy_filter)
202
203         if REQUEST_ID not in policy_filter:
204             policy_filter[REQUEST_ID] = str(uuid.uuid4())
205
206         policies_filtered = PolicyHandler.find_latest_policies(policy_filter)
207
208         if not policies_filtered:
209             error = "policies not found by {0}".format(json.dumps(policy_filter))
210             ctx.logger.info(error)
211             if policy_required:
212                 raise NonRecoverableError(error)
213             return True
214
215         ctx.logger.info("found policies by {0}: {1}".format(
216             json.dumps(policy_filter), json.dumps(policies_filtered)
217         ))
218         ctx.instance.runtime_properties[POLICIES_FILTERED] = policies_filtered
219
220     except Exception as ex:
221         error = "failed to find policies: {0}".format(str(ex))
222         ctx.logger.error("{0}: {1}".format(error, traceback.format_exc()))
223         raise NonRecoverableError(error)
224
225     return True
226
227
228 #########################################################
229 @operation
230 def policy_get(**kwargs):
231     """retrieve the policy or policies and save it in runtime_properties"""
232     if ctx.type != NODE_INSTANCE:
233         raise NonRecoverableError("can only invoke policy_get on node of types: {0}"
234                                   .format(DCAE_POLICY_TYPES))
235
236     if not _policy_get() and not _policies_find():
237         error = "unexpected node type {0} for policy_get - expected types: {1}" \
238                 .format(ctx.node.type_hierarchy, DCAE_POLICY_TYPES)
239         ctx.logger.error(error)
240         raise NonRecoverableError(error)
241
242     ctx.logger.info("exit policy_get")