bbf3ec19fe492ac7fc6de76fa86f772fe732ff75
[dcaegen2/platform/plugins.git] / dcae-policy / dcaepolicyplugin / tasks.py
1 # ================================================================================
2 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 #
17 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
18
19 """tasks are the cloudify operations invoked on interfaces defined in the blueprint"""
20
21 import copy
22 import json
23 import traceback
24 import uuid
25
26 import requests
27 from cloudify import ctx
28 from cloudify.context import NODE_INSTANCE
29 from cloudify.decorators import operation
30 from cloudify.exceptions import NonRecoverableError
31
32 from .discovery import discover_service_url, discover_value
33
34 DCAE_POLICY_PLUGIN = "dcaepolicyplugin"
35 POLICY_ID = 'policy_id'
36 POLICY_REQUIRED = 'policy_required'
37 POLICY_BODY = 'policy_body'
38 POLICIES_FILTERED = 'policies_filtered'
39 POLICY_FILTER = 'policy_filter'
40 LATEST_POLICIES = "latest_policies"
41
42 REQUEST_ID = "requestID"
43
44 DCAE_POLICY_TYPE = 'dcae.nodes.policy'
45 DCAE_POLICIES_TYPE = 'dcae.nodes.policies'
46 DCAE_POLICY_TYPES = [DCAE_POLICY_TYPE, DCAE_POLICIES_TYPE]
47 CONFIG_ATTRIBUTES = "configAttributes"
48
49
50 class PolicyHandler(object):
51     """talk to policy-handler"""
52     SERVICE_NAME_POLICY_HANDLER = "policy_handler"
53     X_ECOMP_REQUESTID = 'X-ECOMP-RequestID'
54     STATUS_CODE_POLICIES_NOT_FOUND = 404
55     DEFAULT_URL = "http://policy-handler"
56     _url = None
57
58     @staticmethod
59     def _lazy_init():
60         """discover policy-handler"""
61         if PolicyHandler._url:
62             return
63
64         PolicyHandler._url = discover_service_url(PolicyHandler.SERVICE_NAME_POLICY_HANDLER)
65         if PolicyHandler._url:
66             return
67
68         config = discover_value(DCAE_POLICY_PLUGIN)
69         if config and isinstance(config, dict):
70             # expected structure for the config value for dcaepolicyplugin key
71             # {
72             #     "dcaepolicyplugin" : {
73             #         "policy_handler" : {
74             #             "target_entity" : "policy_handler",
75             #             "url" : "http://policy-handler:25577"
76             #         }
77             #     }
78             # }
79             PolicyHandler._url = config.get(DCAE_POLICY_PLUGIN, {}) \
80                 .get(PolicyHandler.SERVICE_NAME_POLICY_HANDLER, {}).get("url")
81
82         if PolicyHandler._url:
83             return
84
85         PolicyHandler._url = PolicyHandler.DEFAULT_URL
86
87     @staticmethod
88     def get_latest_policy(policy_id):
89         """retrieve the latest policy for policy_id from policy-handler"""
90         PolicyHandler._lazy_init()
91
92         ph_path = "{0}/policy_latest/{1}".format(PolicyHandler._url, policy_id)
93         headers = {PolicyHandler.X_ECOMP_REQUESTID: str(uuid.uuid4())}
94
95         ctx.logger.info("getting latest policy from {0} headers={1}".format(
96             ph_path, json.dumps(headers)))
97         res = requests.get(ph_path, headers=headers)
98         ctx.logger.info("latest policy for policy_id({0}) status({1}) response: {2}"
99                         .format(policy_id, res.status_code, res.text))
100
101         if res.status_code == PolicyHandler.STATUS_CODE_POLICIES_NOT_FOUND:
102             return
103
104         res.raise_for_status()
105         return res.json()
106
107     @staticmethod
108     def find_latest_policies(policy_filter):
109         """retrieve the latest policies by policy filter (selection criteria) from policy-handler"""
110         PolicyHandler._lazy_init()
111
112         ph_path = "{0}/policies_latest".format(PolicyHandler._url)
113         headers = {
114             PolicyHandler.X_ECOMP_REQUESTID: policy_filter.get(REQUEST_ID, str(uuid.uuid4()))
115         }
116
117         ctx.logger.info("finding the latest polices from {0} by {1} headers={2}".format(
118             ph_path, json.dumps(policy_filter), json.dumps(headers)))
119
120         res = requests.post(ph_path, json=policy_filter, headers=headers)
121         ctx.logger.info("latest policies status({0}) response: {1}"
122                         .format(res.status_code, res.text))
123
124         if res.status_code == PolicyHandler.STATUS_CODE_POLICIES_NOT_FOUND:
125             return
126
127         res.raise_for_status()
128         return res.json().get(LATEST_POLICIES)
129
130
131 def _policy_get():
132     """
133     dcae.nodes.policy -
134     retrieve the latest policy_body for policy_id property
135     and save policy_body in runtime_properties
136     """
137     if DCAE_POLICY_TYPE not in ctx.node.type_hierarchy:
138         return
139
140     policy_id = ctx.node.properties.get(POLICY_ID)
141     policy_required = ctx.node.properties.get(POLICY_REQUIRED)
142     if not policy_id:
143         error = "no {0} found in ctx.node.properties".format(POLICY_ID)
144         ctx.logger.error(error)
145         raise NonRecoverableError(error)
146
147     policy = None
148     try:
149         policy = PolicyHandler.get_latest_policy(policy_id)
150     except Exception as ex:
151         error = "failed to get policy({0}): {1}".format(policy_id, str(ex))
152         ctx.logger.error("{0}: {1}".format(error, traceback.format_exc()))
153         if policy_required:
154             raise NonRecoverableError(error)
155
156     if not policy:
157         error = "policy not found for policy_id {0}".format(policy_id)
158         ctx.logger.info(error)
159         if policy_required:
160             raise NonRecoverableError(error)
161         return True
162
163     ctx.logger.info("found policy {0}: {1}".format(policy_id, json.dumps(policy)))
164     if POLICY_BODY in policy:
165         ctx.instance.runtime_properties[POLICY_BODY] = policy[POLICY_BODY]
166     return True
167
168
169 def _fix_policy_filter(policy_filter):
170     if CONFIG_ATTRIBUTES in policy_filter:
171         config_attributes = policy_filter.get(CONFIG_ATTRIBUTES)
172         if isinstance(config_attributes, dict):
173             return
174         try:
175             config_attributes = json.loads(config_attributes)
176             if config_attributes and isinstance(config_attributes, dict):
177                 policy_filter[CONFIG_ATTRIBUTES] = config_attributes
178                 return
179         except (ValueError, TypeError):
180             pass
181         if config_attributes:
182             ctx.logger.warn("unexpected %s: %s", CONFIG_ATTRIBUTES, config_attributes)
183         del policy_filter[CONFIG_ATTRIBUTES]
184
185
186 def _policies_find():
187     """
188     dcae.nodes.policies -
189     retrieve the latest policies for selection criteria
190     and save found policies in runtime_properties
191     """
192     if DCAE_POLICIES_TYPE not in ctx.node.type_hierarchy:
193         return
194
195     try:
196         policy_filter = copy.deepcopy(dict(
197             (k, v) for (k, v) in dict(ctx.node.properties.get(POLICY_FILTER, {})).iteritems()
198             if v or isinstance(v, (int, float))
199         ))
200         _fix_policy_filter(policy_filter)
201
202         if REQUEST_ID not in policy_filter:
203             policy_filter[REQUEST_ID] = str(uuid.uuid4())
204
205         policies_filtered = PolicyHandler.find_latest_policies(policy_filter)
206
207         if not policies_filtered:
208             ctx.logger.info("policies not found by {0}".format(json.dumps(policy_filter)))
209             return True
210
211         ctx.logger.info("found policies by {0}: {1}".format(
212             json.dumps(policy_filter), json.dumps(policies_filtered)
213         ))
214         ctx.instance.runtime_properties[POLICIES_FILTERED] = policies_filtered
215
216     except Exception as ex:
217         error = "failed to find policies: {0}".format(str(ex))
218         ctx.logger.error("{0}: {1}".format(error, traceback.format_exc()))
219
220     return True
221
222
223 #########################################################
224 @operation
225 def policy_get(**kwargs):
226     """retrieve the policy or policies and save it in runtime_properties"""
227     if ctx.type != NODE_INSTANCE:
228         raise NonRecoverableError("can only invoke policy_get on node of types: {0}"
229                                   .format(DCAE_POLICY_TYPES))
230
231     if not _policy_get() and not _policies_find():
232         error = "unexpected node type {0} for policy_get - expected types: {1}" \
233                 .format(ctx.node.type_hierarchy, DCAE_POLICY_TYPES)
234         ctx.logger.error(error)
235         raise NonRecoverableError(error)
236
237     ctx.logger.info("exit policy_get")