Seed policysync container code
[dcaegen2/deployments.git] / dcae-services-policy-sync / policysync / clients.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """Clients for communicating with both the post dublin and pre dublin APIs"""
17 import json
18 import re
19 import base64
20 import uuid
21 import asyncio
22 import aiohttp
23 import policysync.metrics as metrics
24 from .util import get_module_logger
25
26 logger = get_module_logger(__name__)
27
28 # Websocket config
29 WS_HEARTBEAT = 60
30 WS_NOTIFICATIONS_ENDPOINT = "pdp/notifications"
31 # REST config
32 V1_DECISION_ENDPOINT = "policy/pdpx/v1/decision"
33 V0_DECISION_ENDPOINT = "pdp/api"
34
35 APPLICATION_JSON = "application/json"
36
37
38 def get_single_regex(filters, ids):
39     """given a list of filters and ids returns a single regex for matching"""
40     filters = [] if filters is None else filters
41     ids = [] if ids is None else ["{}[.][0-9]+[.]xml".format(x) for x in ids]
42     return "|".join(filters + ids) if filters is not None else ""
43
44
45 class BasePolicyClient:
46     """ Base policy client that is pluggable into inventory """
47     def __init__(self, pdp_url, headers=None):
48         self.headers = {} if headers is None else headers
49         self.session = None
50         self.pdp_url = pdp_url
51
52     def _init_rest_session(self):
53         """
54         initialize an aiohttp rest session
55         :returns: an aiohttp rest session
56         """
57         if self.session is None:
58             self.session = aiohttp.ClientSession(
59                 headers=self.headers, raise_for_status=True
60             )
61
62         return self.session
63
64     async def _run_request(self, endpoint, request_data):
65         """
66         execute a particular REST request
67         :param endpoint: str rest endpoint to query
68         :param request_data: dictionary request data
69         :returns:  dictionary response data
70         """
71         session = self._init_rest_session()
72         async with session.post(
73             "{0}/{1}".format(self.pdp_url, endpoint), json=request_data
74         ) as resp:
75             data = await resp.read()
76             return json.loads(data)
77
78     def supports_notifications(self):
79         """
80         does this particular client support real time notifictions
81         :returns: True
82         """
83         # in derived classes we may use self
84         # pylint: disable=no-self-use
85         return True
86
87     async def list_policies(self, filters=None, ids=None):
88         """
89         used to get a list of policies matching a particular ID
90         :param filters: list of regex filter strings for matching
91         :param ids: list of id strings for matching
92         :returns: List of policies matching filters or ids
93         """
94         raise NotImplementedError
95
96     async def get_config(self, filters=None, ids=None):
97         """
98         used to get a list of policies matching a particular ID
99         :returns: List of policies matching filters or ids
100         """
101         raise NotImplementedError
102
103     async def notificationhandler(self, callback, ids=None, filters=None):
104         """
105         Clients should implement this to support real time notifications
106         :param callback: func to execute when a matching notification is found
107         :param ids: list of id strings for matching
108         """
109         raise NotImplementedError
110
111     async def close(self):
112         """ close the policy client """
113         logger.info("closing websocket clients...")
114         if self.session:
115             await self.session.close()
116
117
118 class PolicyClientV0(BasePolicyClient):
119     """
120     Supports the legacy v0 policy API use prior to ONAP Dublin
121     """
122     async def close(self):
123         """ close the policy client """
124         await super().close()
125         if self.ws_session is not None:
126             await self.ws_session.close()
127
128     def __init__(
129         self,
130         headers,
131         pdp_url,
132         decision_endpoint=V0_DECISION_ENDPOINT,
133         ws_endpoint=WS_NOTIFICATIONS_ENDPOINT
134     ):
135         """
136         Initialize a v0 policy client
137         :param headers: Headers to use for policy rest api
138         :param pdp_url: URL of the PDP
139         :param decision_endpoint: root for the decison API
140         :param websocket_endpoint: root of the websocket endpoint
141         """
142         super().__init__(pdp_url, headers=headers)
143         self.ws_session = None
144         self.session = None
145         self.decision_endpoint = decision_endpoint
146         self.ws_endpoint = ws_endpoint
147         self._ws = None
148
149     def _init_ws_session(self):
150         """initialize a websocket session for notifications"""
151         if self.ws_session is None:
152             self.ws_session = aiohttp.ClientSession()
153
154         return self.ws_session
155
156     @metrics.list_policy_exceptions.count_exceptions()
157     async def list_policies(self, filters=None, ids=None):
158         """
159         used to get a list of policies matching a particular ID
160         :param filters: list of regex filter strings for matching
161         :param ids: list of id strings for matching
162         :returns: List of policies matching filters or ids
163         """
164         request_data = self._prepare_request(filters, ids)
165         policies = await self._run_request(
166             f"{self.decision_endpoint}/listPolicy", request_data
167         )
168         return set(policies)
169
170     @classmethod
171     def _prepare_request(cls, filters, ids):
172         """prepare the request body for the v0 api"""
173         regex = get_single_regex(filters, ids)
174         return {"policyName": regex}
175
176     @metrics.get_config_exceptions.count_exceptions()
177     async def get_config(self, filters=None, ids=None):
178         """
179         Used to get the actual policy configuration from PDP
180         :return: the policy objects that are currently active
181         for the given set of filters
182         """
183         request_data = self._prepare_request(filters, ids)
184         policies = await self._run_request(
185             f"{self.decision_endpoint}/getConfig", request_data)
186
187         for policy in policies:
188             try:
189                 policy["config"] = json.loads(policy["config"])
190             except json.JSONDecodeError:
191                 pass
192
193         return policies
194
195     @classmethod
196     def _needs_update(cls, update, ids=None, filters=None):
197         """
198         Expect something like this
199         {
200             "removedPolicies": [{
201                 "policyName": "xyz.45.xml",
202                 "versionNo": "45"
203             }],
204             "loadedPolicies": [{
205                 "policyName": "xyz.46.xml",
206                 "versionNo": "46",
207                 "matches": {
208                     "ONAPName": "DCAE",
209                     "ConfigName": "DCAE_HighlandPark_AgingConfig",
210                     "service": "DCAE_HighlandPark_AgingConfig",
211                     "guard": "false",
212                     "location": " Edge",
213                     "TTLDate": "NA",
214                     "uuid": "TestUUID",
215                     "RiskLevel": "5",
216                     "RiskType": "default"
217                 },
218                 "updateType": "UPDATE"
219             }],
220             "notificationType": "BOTH"
221         }
222         """
223         for policy in update.get("removedPolicies", []) + update.get(
224             "loadedPolicies", []
225         ):
226             if (
227                 re.match(get_single_regex(filters, ids), policy["policyName"])
228                 is not None
229             ):
230                 return True
231
232         return False
233
234     async def notificationhandler(self, callback, ids=None, filters=None):
235         """
236         websocket based notification handler for
237         :param callback: function to execute when
238         a matching notification is found
239         :param ids: list of id strings for matching
240         """
241
242         url = self.pdp_url.replace("https", "wss")
243
244         # The websocket we start here will periodically
245         # send heartbeat (ping frames) to policy
246         # this ensures that we are never left hanging
247         # with our communication with policy.
248         session = self._init_ws_session()
249         try:
250             websocket = await session.ws_connect(
251                 "{0}/{1}".format(url, self.ws_endpoint), heartbeat=WS_HEARTBEAT
252             )
253             logger.info("websock with policy established")
254             async for msg in websocket:
255                 # check for websocket errors
256                 #  break out of this async for loop. to attempt reconnection
257                 if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
258                     break
259
260                 if msg.type is (aiohttp.WSMsgType.TEXT):
261                     if self._needs_update(
262                         json.loads(msg.data),
263                         ids=ids,
264                         filters=filters
265                     ):
266                         logger.debug(
267                             "notification received from pdp websocket -> %s", msg
268                         )
269                         await callback()
270                 else:
271                     logger.warning(
272                         "unexpected websocket message type received %s", msg.type
273                     )
274         except aiohttp.ClientError:
275             logger.exception("Received connection error with websocket")
276
277
278 class PolicyClientV1(BasePolicyClient):
279     """
280     Supports the v1 policy API introduced in ONAP's dublin release
281     """
282
283     async def close(self):
284         """ close the policy client """
285         await super().close()
286         if self.dmaap_session is not None:
287             await self.dmaap_session.close()
288
289     def _init_dmaap_session(self):
290         """ initialize a dmaap session for notifications """
291         if self.dmaap_session is None:
292             self.dmaap_session = aiohttp.ClientSession(
293                     headers=self.dmaap_headers,
294                     raise_for_status=True
295                 )
296
297         return self.dmaap_session
298
299     def __init__(
300         self,
301         headers,
302         pdp_url,
303         **kwargs,
304     ):
305         super().__init__(pdp_url, headers=headers)
306         self._ws = None
307         self.audit_uuid = str(uuid.uuid4())
308         self.dmaap_url = kwargs.get('dmaap_url')
309         self.dmaap_timeout = 15000
310         self.dmaap_session = None
311         self.dmaap_headers = kwargs.get('dmaap_headers', {})
312         self.decision = kwargs.get('v1_decision', V1_DECISION_ENDPOINT)
313
314     async def list_policies(self, filters=None, ids=None):
315         """
316         ONAP has no real equivalent to this.
317         :returns: None
318         """
319         # in derived classes we may use self
320         # pylint: disable=no-self-use
321         return None
322
323     @classmethod
324     def convert_to_policy(cls, policy_body):
325         """
326         Convert raw policy to format expected by microservices
327         :param policy_body: raw dictionary output from pdp
328         :returns: data in proper formatting
329         """
330         pdp_metadata = policy_body.get("metadata", {})
331         policy_id = pdp_metadata.get("policy-id")
332         policy_version = policy_body.get("version")
333         if not policy_id or policy_version is None:
334             logger.warning("Malformed policy is missing policy-id and version")
335             return None
336
337         policy_body["policyName"] = "{}.{}.xml".format(
338             policy_id, str(policy_version.replace(".", "-"))
339         )
340         policy_body["policyVersion"] = str(policy_version)
341         if "properties" in policy_body:
342             policy_body["config"] = policy_body["properties"]
343             del policy_body["properties"]
344
345         return policy_body
346
347     @metrics.get_config_exceptions.count_exceptions()
348     async def get_config(self, filters=None, ids=None):
349         """
350         Used to get the actual policy configuration from PDP
351         :returns: the policy objects that are currently active
352         for the given set of filters
353         """
354         if ids is None:
355             ids = []
356
357         request_data = {
358                 "ONAPName": "DCAE",
359                 "ONAPComponent": "policy-sync",
360                 "ONAPInstance": self.audit_uuid,
361                 "action": "configure",
362                 "resource": {"policy-id": ids}
363         }
364
365         data = await self._run_request(self.decision, request_data)
366         out = []
367         for policy_body in data["policies"].values():
368             policy = self.convert_to_policy(policy_body)
369             if policy is not None:
370                 out.append(policy)
371
372         return out
373
374     def supports_notifications(self):
375         """
376         Does this policy client support real time notifications
377         :returns: True if the dmaap url is set else return false
378         """
379         return self.dmaap_url is not None
380
381     @classmethod
382     def _needs_update(cls, update, ids):
383         """
384         expect something like this
385         {
386             "deployed-policies": [
387                 {
388                     "policy-type": "onap.policies.monitoring.tcagen2",
389                     "policy-type-version": "1.0.0",
390                     "policy-id": "onap.scaleout.tca",
391                     "policy-version": "2.0.0",
392                     "success-count": 3,
393                     "failure-count": 0
394                 }
395             ],
396             "undeployed-policies": [
397                 {
398                     "policy-type": "onap.policies.monitoring.tcagen2",
399                     "policy-type-version": "1.0.0",
400                     "policy-id": "onap.firewall.tca",
401                     "policy-version": "6.0.0",
402                     "success-count": 3,
403                     "failure-count": 0
404                 }
405             ]
406         }
407         """
408         for policy in update.get("deployed-policies", []) + update.get(
409             "undeployed-policies", []
410         ):
411             if policy.get("policy-id") in ids:
412                 return True
413
414         return False
415
416     async def poll_dmaap(self, callback, ids=None):
417         """
418         one GET request to dmaap
419         :param callback: function to execute when a
420         matching notification is found
421         :param ids: list of id strings for matching
422         """
423         query = f"?timeout={self.dmaap_timeout}"
424         url = f"{self.dmaap_url}/{self.audit_uuid}/0{query}"
425         logger.info("polling topic: %s", url)
426         session = self._init_dmaap_session()
427         try:
428             async with session.get(url) as response:
429                 messages = await response.read()
430
431                 for msg in json.loads(messages):
432                     if self._needs_update(json.loads(msg), ids):
433                         logger.info(
434                             "notification received from dmaap -> %s", msg
435                         )
436                         await callback()
437         except aiohttp.ClientError:
438             logger.exception('received connection error from dmaap topic')
439             # wait some time
440             await asyncio.sleep(30)
441
442     async def notificationhandler(self, callback, ids=None, filters=None):
443         """
444         dmaap based notification handler for
445         :param callback: function to execute when a
446         matching notification is found
447         :param ids: list of id strings for matching
448         """
449         if filters is not None:
450             logger.warning("filters are not supported with pdp v1..ignoring")
451         while True:
452             await self.poll_dmaap(callback, ids=ids)
453
454
455 def get_client(
456     pdp_url,
457     use_v0=False,
458     **kwargs
459 ):
460     """
461     get a particular policy client
462     :param use_v0: whether this should be a v0 client or
463     :return: A policy client
464     """
465     if pdp_url is None:
466         raise ValueError("POLICY_SYNC_PDP_URL set or --pdp flag not set")
467
468     pdp_headers = {
469         "Accept": APPLICATION_JSON,
470         "Content-Type": APPLICATION_JSON
471     }
472
473     if 'pdp_user' in kwargs and 'pdp_password' in kwargs:
474         auth = base64.b64encode(
475             "{}:{}".format(
476                     kwargs.get('pdp_user'),
477                     kwargs.get('pdp_password')
478                 ).encode("utf-8")
479         )
480         pdp_headers["Authorization"] = "Basic {}".format(auth.decode("utf-8"))
481
482     dmaap_headers = {
483         "Accept": APPLICATION_JSON,
484         "Content-Type": APPLICATION_JSON
485     }
486
487     logger.info(kwargs.get('dmaap_password'))
488     if 'dmaap_user' in kwargs and 'dmaap_password' in kwargs:
489         auth = base64.b64encode(
490             "{}:{}".format(
491                     kwargs.get('dmaap_user'),
492                     kwargs.get('dmaap_password')
493                 ).encode("utf-8")
494         ).decode("utf-8")
495         dmaap_headers["Authorization"] = f"Basic {auth}"
496
497     # Create client (either v0 or v1) based on arguments)
498     if use_v0:
499         return PolicyClientV0(
500             pdp_headers,
501             pdp_url,
502             decision_endpoint=kwargs.get('v0_decision'),
503             ws_endpoint=kwargs.get('v0_notifications')
504         )
505
506     return PolicyClientV1(
507         pdp_headers,
508         pdp_url,
509         v1_decision=kwargs.get('v1_decision'),
510         dmaap_url=kwargs.get('dmaap_url'),
511         dmaap_headers=dmaap_headers
512     )