[PMSH] Update Filter API
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / mod / pmsh_utils.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 import json
19 import os
20 import uuid
21 from functools import wraps
22 from json import JSONDecodeError
23 from os import getenv
24 from threading import Timer
25
26 import requests
27 from jsonschema import validate, ValidationError
28 from onap_dcae_cbs_docker_client.client import get_all
29 from onaplogging.mdcContext import MDC
30 from requests.auth import HTTPBasicAuth
31 from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
32
33 from mod import logger
34 from mod.subscription import Subscription
35
36
37 def mdc_handler(func):
38     @wraps(func)
39     def wrapper(*args, **kwargs):
40         request_id = str(uuid.uuid1())
41         invocation_id = str(uuid.uuid1())
42         MDC.put('ServiceName', getenv('HOSTNAME'))
43         MDC.put('RequestID', request_id)
44         MDC.put('InvocationID', invocation_id)
45
46         kwargs['request_id'] = request_id
47         kwargs['invocation_id'] = invocation_id
48         return func(*args, **kwargs)
49
50     return wrapper
51
52
53 def _load_sub_schema_from_file():
54     try:
55         with open(os.path.join(os.path.dirname(__file__), 'sub_schema.json')) as sub:
56             return json.load(sub)
57     except OSError as err:
58         logger.error(f'Failed to read sub schema file: {err}', exc_info=True)
59     except JSONDecodeError as json_err:
60         logger.error(f'sub schema file is not a valid JSON file: {json_err}', exc_info=True)
61
62
63 class AppConfig:
64     INSTANCE = None
65
66     def __init__(self):
67         conf = self._get_pmsh_config()
68         self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'),
69                           'aaf_pass': conf['config'].get('aaf_password')}
70         self.enable_tls = conf['config'].get('enable_tls')
71         self.ca_cert_path = conf['config'].get('ca_cert_path')
72         self.cert_path = conf['config'].get('cert_path')
73         self.key_path = conf['config'].get('key_path')
74         self.streams_subscribes = conf['config'].get('streams_subscribes')
75         self.streams_publishes = conf['config'].get('streams_publishes')
76         self.operational_policy_name = conf['config'].get('operational_policy_name')
77         self.control_loop_name = conf['config'].get('control_loop_name')
78         self.sub_schema = _load_sub_schema_from_file()
79         self.subscription = Subscription(self.control_loop_name,
80                                          self.operational_policy_name,
81                                          **conf['config']['pmsh_policy']['subscription'])
82         self.nf_filter = None
83
84     def __new__(cls, *args, **kwargs):
85         if AppConfig.INSTANCE is None:
86             AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs)
87         return AppConfig.INSTANCE
88
89     @mdc_handler
90     @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
91            retry=retry_if_exception_type(ValueError))
92     def _get_pmsh_config(self, **kwargs):
93         """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
94         is received, it retries after 2 seconds for 5 times before raising an exception.
95
96         Returns:
97             dict: Dictionary representation of the the service configuration
98
99         Raises:
100             Exception: If any error occurred pulling configuration from Config binding service.
101         """
102         try:
103             logger.info('Attempting to fetch PMSH Configuration from CBS.')
104             config = get_all()
105             logger.info(f'Successfully fetched PMSH config from CBS: {config}')
106             return config
107         except Exception as e:
108             logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
109             raise ValueError(e)
110
111     def validate_sub_schema(self):
112         """
113         Validates schema of PMSH subscription
114
115         Raises:
116             ValidationError: If the PMSH subscription schema is invalid
117         """
118         sub_data = self.subscription.__dict__
119         validate(instance=sub_data, schema=self.sub_schema)
120         nf_filter = sub_data["nfFilter"]
121         if not [filter_name for filter_name, val in nf_filter.items() if len(val) > 0]:
122             raise ValidationError("At least one filter within nfFilter must not be empty")
123         logger.debug("Subscription schema is valid.")
124
125     def refresh_config(self):
126         """
127         Update the relevant attributes of the AppConfig object.
128
129         Raises:
130             Exception: if cbs request fails.
131         """
132         try:
133             app_conf = self._get_pmsh_config()
134             self.operational_policy_name = app_conf['config'].get('operational_policy_name')
135             self.control_loop_name = app_conf['config'].get('control_loop_name')
136             self.subscription = Subscription(self.control_loop_name,
137                                              self.operational_policy_name,
138                                              **app_conf['config']['pmsh_policy']['subscription'])
139             logger.info("AppConfig data has been refreshed")
140         except Exception:
141             logger.error('Failed to refresh PMSH AppConfig')
142             raise
143
144     def get_mr_sub(self, sub_name):
145         """
146         Returns the MrSub object requested.
147
148         Args:
149             sub_name: the key of the subscriber object.
150
151         Returns:
152             MrSub: an Instance of an `MrSub` <MrSub> Object.
153
154         Raises:
155             KeyError: if the sub_name is not found.
156         """
157         try:
158             return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path,
159                           self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name])
160         except KeyError as e:
161             logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True)
162             raise
163
164     def get_mr_pub(self, pub_name):
165         """
166         Returns the MrPub object requested.
167
168         Args:
169             pub_name: the key of the publisher object.
170
171         Returns:
172             MrPub: an Instance of an `MrPub` <MrPub> Object.
173
174         Raises:
175             KeyError: if the sub_name is not found.
176         """
177         try:
178             return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path,
179                           self.enable_tls, self.cert_params, **self.streams_publishes[pub_name])
180         except KeyError as e:
181             logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True)
182             raise
183
184     @property
185     def cert_params(self):
186         """
187         Returns the tls artifact paths.
188
189         Returns:
190             cert_path, key_path (tuple): the path to tls cert and key.
191         """
192         return self.cert_path, self.key_path
193
194
195 class _DmaapMrClient:
196     def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
197         """
198         A DMaaP Message Router utility class.
199         Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
200         Args:
201             aaf_creds (dict): a dict of aaf secure credentials.
202             ca_cert_path (str): path to the ca certificate.
203             enable_tls (bool): TLS if True, else False
204             cert_params (tuple): client side (cert, key) tuple.
205             **kwargs: a dict of streams_{subscribes|publishes} data.
206         """
207         self.topic_url = kwargs.get('dmaap_info').get('topic_url')
208         self.aaf_id = aaf_creds.get('aaf_id')
209         self.aaf_pass = aaf_creds.get('aaf_pass')
210         self.ca_cert_path = ca_cert_path
211         self.enable_tls = enable_tls
212         self.cert_params = cert_params
213
214
215 class _MrPub(_DmaapMrClient):
216     def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
217         self.pub_name = pub_name
218         super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
219
220     @mdc_handler
221     def publish_to_topic(self, event_json, **kwargs):
222         """
223         Publish the event to the DMaaP Message Router topic.
224
225         Args:
226             event_json (dict): the json data to be published.
227
228         Raises:
229             Exception: if post request fails.
230         """
231         try:
232             session = requests.Session()
233             headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
234                        'InvocationID': kwargs['invocation_id'],
235                        'RequestID': kwargs['request_id']
236                        }
237             logger.info(f'Publishing event to {self.topic_url}')
238             response = session.post(self.topic_url, headers=headers,
239                                     auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
240                                     verify=(self.ca_cert_path if self.enable_tls else False))
241             response.raise_for_status()
242         except Exception as e:
243             raise e
244
245     def publish_subscription_event_data(self, subscription, nf):
246         """
247         Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
248
249         Args:
250             subscription (Subscription): the `Subscription` <Subscription> object.
251             nf (NetworkFunction): the NetworkFunction to include in the event.
252         """
253         try:
254             subscription_event = subscription.prepare_subscription_event(nf)
255             logger.debug(f'Subscription event: {subscription_event}')
256             self.publish_to_topic(subscription_event)
257         except Exception as e:
258             logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True)
259             raise e
260
261
262 class _MrSub(_DmaapMrClient):
263     def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
264         self.sub_name = sub_name
265         super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
266
267     @mdc_handler
268     def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs):
269         """
270         Returns the json data from the MrTopic.
271
272         Args:
273             consumer_id (str): Within your subscribers group, a name that uniquely
274             identifies your subscribers process.
275             consumer_group (str): A name that uniquely identifies your subscribers.
276             timeout (int): The request timeout value in mSec.
277
278         Returns:
279             list[str]: the json response from DMaaP Message Router topic.
280         """
281         try:
282             session = requests.Session()
283             headers = {'accept': 'application/json', 'content-type': 'application/json',
284                        'InvocationID': kwargs['invocation_id'],
285                        'RequestID': kwargs['request_id']}
286             logger.info(f'Fetching messages from MR topic: {self.topic_url}')
287             response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
288                                    f'?timeout={timeout}',
289                                    auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
290                                    verify=(self.ca_cert_path if self.enable_tls else False))
291             if response.status_code == 503:
292                 logger.error(f'MR Service is unavailable at present: {response.content}')
293                 pass
294             response.raise_for_status()
295             if response.ok:
296                 return response.json()
297         except Exception as e:
298             logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
299             raise
300
301
302 class PeriodicTask(Timer):
303     """
304     See :class:`Timer`.
305     """
306
307     def run(self):
308         self.function(*self.args, **self.kwargs)
309         while not self.finished.wait(self.interval):
310             try:
311                 self.function(*self.args, **self.kwargs)
312             except Exception as e:
313                 logger.error(f'Exception in thread: {self.name}: {e}', exc_info=True)