1 # ============LICENSE_START===================================================
2 # Copyright (C) 2019-2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
21 from functools import wraps
22 from json import JSONDecodeError
24 from threading import Timer
27 from jsonschema import validate, ValidationError
28 from onap_dcae_cbs_docker_client.client import get_all
29 from onaplogging.mdcContext import MDC
30 from requests.auth import HTTPBasicAuth
31 from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
33 from mod import logger
34 from mod.subscription import Subscription
37 def mdc_handler(func):
39 def wrapper(*args, **kwargs):
40 request_id = str(uuid.uuid1())
41 invocation_id = str(uuid.uuid1())
42 MDC.put('ServiceName', getenv('HOSTNAME'))
43 MDC.put('RequestID', request_id)
44 MDC.put('InvocationID', invocation_id)
46 kwargs['request_id'] = request_id
47 kwargs['invocation_id'] = invocation_id
48 return func(*args, **kwargs)
53 def _load_sub_schema_from_file():
55 with open(os.path.join(os.path.dirname(__file__), 'sub_schema.json')) as sub:
57 except OSError as err:
58 logger.error(f'Failed to read sub schema file: {err}', exc_info=True)
59 except JSONDecodeError as json_err:
60 logger.error(f'sub schema file is not a valid JSON file: {json_err}', exc_info=True)
67 conf = self._get_pmsh_config()
68 self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'),
69 'aaf_pass': conf['config'].get('aaf_password')}
70 self.enable_tls = conf['config'].get('enable_tls')
71 self.ca_cert_path = conf['config'].get('ca_cert_path')
72 self.cert_path = conf['config'].get('cert_path')
73 self.key_path = conf['config'].get('key_path')
74 self.streams_subscribes = conf['config'].get('streams_subscribes')
75 self.streams_publishes = conf['config'].get('streams_publishes')
76 self.operational_policy_name = conf['config'].get('operational_policy_name')
77 self.control_loop_name = conf['config'].get('control_loop_name')
78 self.sub_schema = _load_sub_schema_from_file()
79 self.subscription = Subscription(self.control_loop_name,
80 self.operational_policy_name,
81 **conf['config']['pmsh_policy']['subscription'])
84 def __new__(cls, *args, **kwargs):
85 if AppConfig.INSTANCE is None:
86 AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs)
87 return AppConfig.INSTANCE
90 @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
91 retry=retry_if_exception_type(ValueError))
92 def _get_pmsh_config(self, **kwargs):
93 """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
94 is received, it retries after 2 seconds for 5 times before raising an exception.
97 dict: Dictionary representation of the the service configuration
100 Exception: If any error occurred pulling configuration from Config binding service.
103 logger.info('Attempting to fetch PMSH Configuration from CBS.')
105 logger.info(f'Successfully fetched PMSH config from CBS: {config}')
107 except Exception as e:
108 logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
111 def validate_sub_schema(self):
113 Validates schema of PMSH subscription
116 ValidationError: If the PMSH subscription schema is invalid
118 sub_data = self.subscription.__dict__
119 validate(instance=sub_data, schema=self.sub_schema)
120 nf_filter = sub_data["nfFilter"]
121 if not [filter_name for filter_name, val in nf_filter.items() if len(val) > 0]:
122 raise ValidationError("At least one filter within nfFilter must not be empty")
123 logger.debug("Subscription schema is valid.")
125 def refresh_config(self):
127 Update the relevant attributes of the AppConfig object.
130 Exception: if cbs request fails.
133 app_conf = self._get_pmsh_config()
134 self.operational_policy_name = app_conf['config'].get('operational_policy_name')
135 self.control_loop_name = app_conf['config'].get('control_loop_name')
136 self.subscription = Subscription(self.control_loop_name,
137 self.operational_policy_name,
138 **app_conf['config']['pmsh_policy']['subscription'])
139 logger.info("AppConfig data has been refreshed")
141 logger.error('Failed to refresh PMSH AppConfig')
144 def get_mr_sub(self, sub_name):
146 Returns the MrSub object requested.
149 sub_name: the key of the subscriber object.
152 MrSub: an Instance of an `MrSub` <MrSub> Object.
155 KeyError: if the sub_name is not found.
158 return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path,
159 self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name])
160 except KeyError as e:
161 logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True)
164 def get_mr_pub(self, pub_name):
166 Returns the MrPub object requested.
169 pub_name: the key of the publisher object.
172 MrPub: an Instance of an `MrPub` <MrPub> Object.
175 KeyError: if the sub_name is not found.
178 return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path,
179 self.enable_tls, self.cert_params, **self.streams_publishes[pub_name])
180 except KeyError as e:
181 logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True)
185 def cert_params(self):
187 Returns the tls artifact paths.
190 cert_path, key_path (tuple): the path to tls cert and key.
192 return self.cert_path, self.key_path
195 class _DmaapMrClient:
196 def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
198 A DMaaP Message Router utility class.
199 Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
201 aaf_creds (dict): a dict of aaf secure credentials.
202 ca_cert_path (str): path to the ca certificate.
203 enable_tls (bool): TLS if True, else False
204 cert_params (tuple): client side (cert, key) tuple.
205 **kwargs: a dict of streams_{subscribes|publishes} data.
207 self.topic_url = kwargs.get('dmaap_info').get('topic_url')
208 self.aaf_id = aaf_creds.get('aaf_id')
209 self.aaf_pass = aaf_creds.get('aaf_pass')
210 self.ca_cert_path = ca_cert_path
211 self.enable_tls = enable_tls
212 self.cert_params = cert_params
215 class _MrPub(_DmaapMrClient):
216 def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
217 self.pub_name = pub_name
218 super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
221 def publish_to_topic(self, event_json, **kwargs):
223 Publish the event to the DMaaP Message Router topic.
226 event_json (dict): the json data to be published.
229 Exception: if post request fails.
232 session = requests.Session()
233 headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
234 'InvocationID': kwargs['invocation_id'],
235 'RequestID': kwargs['request_id']
237 logger.info(f'Publishing event to {self.topic_url}')
238 response = session.post(self.topic_url, headers=headers,
239 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
240 verify=(self.ca_cert_path if self.enable_tls else False))
241 response.raise_for_status()
242 except Exception as e:
245 def publish_subscription_event_data(self, subscription, nf):
247 Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
250 subscription (Subscription): the `Subscription` <Subscription> object.
251 nf (NetworkFunction): the NetworkFunction to include in the event.
254 subscription_event = subscription.prepare_subscription_event(nf)
255 logger.debug(f'Subscription event: {subscription_event}')
256 self.publish_to_topic(subscription_event)
257 except Exception as e:
258 logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True)
262 class _MrSub(_DmaapMrClient):
263 def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
264 self.sub_name = sub_name
265 super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
268 def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs):
270 Returns the json data from the MrTopic.
273 consumer_id (str): Within your subscribers group, a name that uniquely
274 identifies your subscribers process.
275 consumer_group (str): A name that uniquely identifies your subscribers.
276 timeout (int): The request timeout value in mSec.
279 list[str]: the json response from DMaaP Message Router topic.
282 session = requests.Session()
283 headers = {'accept': 'application/json', 'content-type': 'application/json',
284 'InvocationID': kwargs['invocation_id'],
285 'RequestID': kwargs['request_id']}
286 logger.info(f'Fetching messages from MR topic: {self.topic_url}')
287 response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
288 f'?timeout={timeout}',
289 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
290 verify=(self.ca_cert_path if self.enable_tls else False))
291 if response.status_code == 503:
292 logger.error(f'MR Service is unavailable at present: {response.content}')
294 response.raise_for_status()
296 return response.json()
297 except Exception as e:
298 logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
302 class PeriodicTask(Timer):
308 self.function(*self.args, **self.kwargs)
309 while not self.finished.wait(self.interval):
311 self.function(*self.args, **self.kwargs)
312 except Exception as e:
313 logger.error(f'Exception in thread: {self.name}: {e}', exc_info=True)