1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """Clients for communicating with both the post dublin and pre dublin APIs"""
23 import policysync.metrics as metrics
24 from .util import get_module_logger
26 logger = get_module_logger(__name__)
30 WS_NOTIFICATIONS_ENDPOINT = "pdp/notifications"
32 V1_DECISION_ENDPOINT = "policy/pdpx/v1/decision"
33 V0_DECISION_ENDPOINT = "pdp/api"
35 APPLICATION_JSON = "application/json"
38 def get_single_regex(filters, ids):
39 """given a list of filters and ids returns a single regex for matching"""
40 filters = [] if filters is None else filters
41 ids = [] if ids is None else ["{}[.][0-9]+[.]xml".format(x) for x in ids]
42 return "|".join(filters + ids) if filters is not None else ""
45 class BasePolicyClient:
46 """ Base policy client that is pluggable into inventory """
47 def __init__(self, pdp_url, headers=None):
48 self.headers = {} if headers is None else headers
50 self.pdp_url = pdp_url
52 def _init_rest_session(self):
54 initialize an aiohttp rest session
55 :returns: an aiohttp rest session
57 if self.session is None:
58 self.session = aiohttp.ClientSession(
59 headers=self.headers, raise_for_status=True
64 async def _run_request(self, endpoint, request_data):
66 execute a particular REST request
67 :param endpoint: str rest endpoint to query
68 :param request_data: dictionary request data
69 :returns: dictionary response data
71 session = self._init_rest_session()
72 async with session.post(
73 "{0}/{1}".format(self.pdp_url, endpoint), json=request_data
75 data = await resp.read()
76 return json.loads(data)
78 def supports_notifications(self):
80 does this particular client support real time notifictions
83 # in derived classes we may use self
84 # pylint: disable=no-self-use
87 async def list_policies(self, filters=None, ids=None):
89 used to get a list of policies matching a particular ID
90 :param filters: list of regex filter strings for matching
91 :param ids: list of id strings for matching
92 :returns: List of policies matching filters or ids
94 raise NotImplementedError
96 async def get_config(self, filters=None, ids=None):
98 used to get a list of policies matching a particular ID
99 :returns: List of policies matching filters or ids
101 raise NotImplementedError
103 async def notificationhandler(self, callback, ids=None, filters=None):
105 Clients should implement this to support real time notifications
106 :param callback: func to execute when a matching notification is found
107 :param ids: list of id strings for matching
109 raise NotImplementedError
111 async def close(self):
112 """ close the policy client """
113 logger.info("closing websocket clients...")
115 await self.session.close()
118 class PolicyClientV0(BasePolicyClient):
120 Supports the legacy v0 policy API use prior to ONAP Dublin
122 async def close(self):
123 """ close the policy client """
124 await super().close()
125 if self.ws_session is not None:
126 await self.ws_session.close()
132 decision_endpoint=V0_DECISION_ENDPOINT,
133 ws_endpoint=WS_NOTIFICATIONS_ENDPOINT
136 Initialize a v0 policy client
137 :param headers: Headers to use for policy rest api
138 :param pdp_url: URL of the PDP
139 :param decision_endpoint: root for the decison API
140 :param websocket_endpoint: root of the websocket endpoint
142 super().__init__(pdp_url, headers=headers)
143 self.ws_session = None
145 self.decision_endpoint = decision_endpoint
146 self.ws_endpoint = ws_endpoint
149 def _init_ws_session(self):
150 """initialize a websocket session for notifications"""
151 if self.ws_session is None:
152 self.ws_session = aiohttp.ClientSession()
154 return self.ws_session
156 @metrics.list_policy_exceptions.count_exceptions()
157 async def list_policies(self, filters=None, ids=None):
159 used to get a list of policies matching a particular ID
160 :param filters: list of regex filter strings for matching
161 :param ids: list of id strings for matching
162 :returns: List of policies matching filters or ids
164 request_data = self._prepare_request(filters, ids)
165 policies = await self._run_request(
166 f"{self.decision_endpoint}/listPolicy", request_data
171 def _prepare_request(cls, filters, ids):
172 """prepare the request body for the v0 api"""
173 regex = get_single_regex(filters, ids)
174 return {"policyName": regex}
176 @metrics.get_config_exceptions.count_exceptions()
177 async def get_config(self, filters=None, ids=None):
179 Used to get the actual policy configuration from PDP
180 :return: the policy objects that are currently active
181 for the given set of filters
183 request_data = self._prepare_request(filters, ids)
184 policies = await self._run_request(
185 f"{self.decision_endpoint}/getConfig", request_data)
187 for policy in policies:
189 policy["config"] = json.loads(policy["config"])
190 except json.JSONDecodeError:
196 def _needs_update(cls, update, ids=None, filters=None):
198 Expect something like this
200 "removedPolicies": [{
201 "policyName": "xyz.45.xml",
205 "policyName": "xyz.46.xml",
209 "ConfigName": "DCAE_HighlandPark_AgingConfig",
210 "service": "DCAE_HighlandPark_AgingConfig",
216 "RiskType": "default"
218 "updateType": "UPDATE"
220 "notificationType": "BOTH"
223 for policy in update.get("removedPolicies", []) + update.get(
227 re.match(get_single_regex(filters, ids), policy["policyName"])
234 async def notificationhandler(self, callback, ids=None, filters=None):
236 websocket based notification handler for
237 :param callback: function to execute when
238 a matching notification is found
239 :param ids: list of id strings for matching
242 url = self.pdp_url.replace("https", "wss")
244 # The websocket we start here will periodically
245 # send heartbeat (ping frames) to policy
246 # this ensures that we are never left hanging
247 # with our communication with policy.
248 session = self._init_ws_session()
250 websocket = await session.ws_connect(
251 "{0}/{1}".format(url, self.ws_endpoint), heartbeat=WS_HEARTBEAT
253 logger.info("websock with policy established")
254 async for msg in websocket:
255 # check for websocket errors
256 # break out of this async for loop. to attempt reconnection
257 if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
260 if msg.type is (aiohttp.WSMsgType.TEXT):
261 if self._needs_update(
262 json.loads(msg.data),
267 "notification received from pdp websocket -> %s", msg
272 "unexpected websocket message type received %s", msg.type
274 except aiohttp.ClientError:
275 logger.exception("Received connection error with websocket")
278 class PolicyClientV1(BasePolicyClient):
280 Supports the v1 policy API introduced in ONAP's dublin release
283 async def close(self):
284 """ close the policy client """
285 await super().close()
286 if self.dmaap_session is not None:
287 await self.dmaap_session.close()
289 def _init_dmaap_session(self):
290 """ initialize a dmaap session for notifications """
291 if self.dmaap_session is None:
292 self.dmaap_session = aiohttp.ClientSession(
293 headers=self.dmaap_headers,
294 raise_for_status=True
297 return self.dmaap_session
305 super().__init__(pdp_url, headers=headers)
307 self.audit_uuid = str(uuid.uuid4())
308 self.dmaap_url = kwargs.get('dmaap_url')
309 self.dmaap_timeout = 15000
310 self.dmaap_session = None
311 self.dmaap_headers = kwargs.get('dmaap_headers', {})
312 self.decision = kwargs.get('v1_decision', V1_DECISION_ENDPOINT)
314 async def list_policies(self, filters=None, ids=None):
316 ONAP has no real equivalent to this.
319 # in derived classes we may use self
320 # pylint: disable=no-self-use
324 def convert_to_policy(cls, policy_body):
326 Convert raw policy to format expected by microservices
327 :param policy_body: raw dictionary output from pdp
328 :returns: data in proper formatting
330 pdp_metadata = policy_body.get("metadata", {})
331 policy_id = pdp_metadata.get("policy-id")
332 policy_version = policy_body.get("version")
333 if not policy_id or policy_version is None:
334 logger.warning("Malformed policy is missing policy-id and version")
337 policy_body["policyName"] = "{}.{}.xml".format(
338 policy_id, str(policy_version.replace(".", "-"))
340 policy_body["policyVersion"] = str(policy_version)
341 if "properties" in policy_body:
342 policy_body["config"] = policy_body["properties"]
343 del policy_body["properties"]
347 @metrics.get_config_exceptions.count_exceptions()
348 async def get_config(self, filters=None, ids=None):
350 Used to get the actual policy configuration from PDP
351 :returns: the policy objects that are currently active
352 for the given set of filters
359 "ONAPComponent": "policy-sync",
360 "ONAPInstance": self.audit_uuid,
361 "action": "configure",
362 "resource": {"policy-id": ids}
365 data = await self._run_request(self.decision, request_data)
367 for policy_body in data["policies"].values():
368 policy = self.convert_to_policy(policy_body)
369 if policy is not None:
374 def supports_notifications(self):
376 Does this policy client support real time notifications
377 :returns: True if the dmaap url is set else return false
379 return self.dmaap_url is not None
382 def _needs_update(cls, update, ids):
384 expect something like this
386 "deployed-policies": [
388 "policy-type": "onap.policies.monitoring.tcagen2",
389 "policy-type-version": "1.0.0",
390 "policy-id": "onap.scaleout.tca",
391 "policy-version": "2.0.0",
396 "undeployed-policies": [
398 "policy-type": "onap.policies.monitoring.tcagen2",
399 "policy-type-version": "1.0.0",
400 "policy-id": "onap.firewall.tca",
401 "policy-version": "6.0.0",
408 for policy in update.get("deployed-policies", []) + update.get(
409 "undeployed-policies", []
411 if policy.get("policy-id") in ids:
416 async def poll_dmaap(self, callback, ids=None):
418 one GET request to dmaap
419 :param callback: function to execute when a
420 matching notification is found
421 :param ids: list of id strings for matching
423 query = f"?timeout={self.dmaap_timeout}"
424 url = f"{self.dmaap_url}/{self.audit_uuid}/0{query}"
425 logger.info("polling topic: %s", url)
426 session = self._init_dmaap_session()
428 async with session.get(url) as response:
429 messages = await response.read()
431 for msg in json.loads(messages):
432 if self._needs_update(json.loads(msg), ids):
434 "notification received from dmaap -> %s", msg
437 except aiohttp.ClientError:
438 logger.exception('received connection error from dmaap topic')
440 await asyncio.sleep(30)
442 async def notificationhandler(self, callback, ids=None, filters=None):
444 dmaap based notification handler for
445 :param callback: function to execute when a
446 matching notification is found
447 :param ids: list of id strings for matching
449 if filters is not None:
450 logger.warning("filters are not supported with pdp v1..ignoring")
452 await self.poll_dmaap(callback, ids=ids)
461 get a particular policy client
462 :param use_v0: whether this should be a v0 client or
463 :return: A policy client
466 raise ValueError("POLICY_SYNC_PDP_URL set or --pdp flag not set")
469 "Accept": APPLICATION_JSON,
470 "Content-Type": APPLICATION_JSON
473 if 'pdp_user' in kwargs and 'pdp_password' in kwargs:
474 auth = base64.b64encode(
476 kwargs.get('pdp_user'),
477 kwargs.get('pdp_password')
480 pdp_headers["Authorization"] = "Basic {}".format(auth.decode("utf-8"))
483 "Accept": APPLICATION_JSON,
484 "Content-Type": APPLICATION_JSON
487 logger.info(kwargs.get('dmaap_password'))
488 if 'dmaap_user' in kwargs and 'dmaap_password' in kwargs:
489 auth = base64.b64encode(
491 kwargs.get('dmaap_user'),
492 kwargs.get('dmaap_password')
495 dmaap_headers["Authorization"] = f"Basic {auth}"
497 # Create client (either v0 or v1) based on arguments)
499 return PolicyClientV0(
502 decision_endpoint=kwargs.get('v0_decision'),
503 ws_endpoint=kwargs.get('v0_notifications')
506 return PolicyClientV1(
509 v1_decision=kwargs.get('v1_decision'),
510 dmaap_url=kwargs.get('dmaap_url'),
511 dmaap_headers=dmaap_headers