# ============LICENSE_START===================================================
-# Copyright (C) 2019-2020 Nordix Foundation.
+# Copyright (C) 2019-2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-import threading
+import json
+import os
import uuid
+from json import JSONDecodeError
from os import getenv
from threading import Timer
from onaplogging.mdcContext import MDC
from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
+from jsonschema import validate, ValidationError
from mod import logger
-from mod.network_function import NetworkFunctionFilter
from mod.subscription import Subscription
kwargs['request_id'] = request_id
kwargs['invocation_id'] = invocation_id
return function(*args, **kwargs)
+
return decorator
-class ThreadSafeSingleton(type):
- _instances = {}
- _singleton_lock = threading.Lock()
+class MySingleton(object):
+ instances = {}
+
+ def __new__(cls, clz=None):
+ if clz is None:
+ if cls.__name__ not in MySingleton.instances:
+ MySingleton.instances[cls.__name__] = \
+ object.__new__(cls)
+ return MySingleton.instances[cls.__name__]
+ MySingleton.instances[clz.__name__] = clz()
+ MySingleton.first = clz
+ return type(clz.__name__, (MySingleton,), dict(clz.__dict__))
- def __call__(cls, *args, **kwargs):
- # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking)
- if cls not in cls._instances:
- with cls._singleton_lock:
- if cls not in cls._instances:
- cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
- return cls._instances[cls]
+def _load_sub_schema_from_file():
+ try:
+ with open(os.path.join(os.path.dirname(__file__), 'sub_schema.json')) as sub:
+ return json.load(sub)
+ except OSError as err:
+ logger.error(f'Failed to read sub schema file: {err}', exc_info=True)
+ except JSONDecodeError as json_err:
+ logger.error(f'sub schema file is not a valid JSON file: {json_err}', exc_info=True)
-class AppConfig(metaclass=ThreadSafeSingleton):
+
+class AppConfig:
+ INSTANCE = None
def __init__(self):
- try:
- conf = self._get_pmsh_config()
- except Exception:
- raise
+ conf = self._get_pmsh_config()
self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'),
'aaf_pass': conf['config'].get('aaf_password')}
+ self.enable_tls = conf['config'].get('enable_tls')
+ self.ca_cert_path = conf['config'].get('ca_cert_path')
self.cert_path = conf['config'].get('cert_path')
self.key_path = conf['config'].get('key_path')
self.streams_subscribes = conf['config'].get('streams_subscribes')
self.streams_publishes = conf['config'].get('streams_publishes')
self.operational_policy_name = conf['config'].get('operational_policy_name')
self.control_loop_name = conf['config'].get('control_loop_name')
+ self.sub_schema = _load_sub_schema_from_file()
self.subscription = Subscription(**conf['policy']['subscription'])
- self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter)
+ self.nf_filter = None
+
+ def __new__(cls, *args, **kwargs):
+ if AppConfig.INSTANCE is None:
+ AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs)
+ return AppConfig.INSTANCE
@mdc_handler
- @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
+ @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
+ retry=retry_if_exception_type(ValueError))
def _get_pmsh_config(self, **kwargs):
""" Retrieves PMSH's configuration from Config binding service. If a non-2xx response
is received, it retries after 2 seconds for 5 times before raising an exception.
Exception: If any error occurred pulling configuration from Config binding service.
"""
try:
- logger.info('Fetching PMSH Configuration from CBS.')
+ logger.info('Attempting to fetch PMSH Configuration from CBS.')
config = get_all()
logger.info(f'Successfully fetched PMSH config from CBS: {config}')
return config
- except Exception as err:
- logger.error(f'Failed to get config from CBS: {err}')
- raise Exception
+ except Exception as e:
+ logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
+ raise ValueError(e)
+
+ def validate_sub_schema(self):
+ """
+ Validates schema of PMSH subscription
+
+ Raises:
+ ValidationError: If the PMSH subscription schema is invalid
+ """
+ sub_data = self.subscription.__dict__
+ validate(instance=sub_data, schema=self.sub_schema)
+ nf_filter = sub_data["nfFilter"]
+ if not [filter_name for filter_name, val in nf_filter.items() if len(val) > 0]:
+ raise ValidationError("At least one filter within nfFilter must not be empty")
+ logger.debug("Subscription schema is valid.")
def refresh_config(self):
"""
"""
try:
app_conf = self._get_pmsh_config()
+ self.subscription = Subscription(**app_conf['policy']['subscription'])
+ logger.info("AppConfig data has been refreshed")
except Exception:
- logger.debug("Failed to refresh AppConfig data")
+ logger.error('Failed to refresh PMSH AppConfig')
raise
- self.subscription.administrativeState = \
- app_conf['policy']['subscription']['administrativeState']
- self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames']
- self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter']['swVersions']
- logger.info("AppConfig data has been refreshed")
def get_mr_sub(self, sub_name):
"""
KeyError: if the sub_name is not found.
"""
try:
- return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name])
+ return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path,
+ self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name])
except KeyError as e:
- logger.debug(e)
+ logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True)
raise
def get_mr_pub(self, pub_name):
KeyError: if the sub_name is not found.
"""
try:
- return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name])
+ return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path,
+ self.enable_tls, self.cert_params, **self.streams_publishes[pub_name])
except KeyError as e:
- logger.debug(e)
+ logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True)
raise
@property
Returns the tls artifact paths.
Returns:
- cert_path, key_path: the path to tls cert and key.
+ cert_path, key_path (tuple): the path to tls cert and key.
"""
return self.cert_path, self.key_path
class _DmaapMrClient:
- def __init__(self, aaf_creds, **kwargs):
+ def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
"""
A DMaaP Message Router utility class.
Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
Args:
- aaf_creds: a dict of aaf secure credentials.
+ aaf_creds (dict): a dict of aaf secure credentials.
+ ca_cert_path (str): path to the ca certificate.
+ enable_tls (bool): TLS if True, else False
+ cert_params (tuple): client side (cert, key) tuple.
**kwargs: a dict of streams_{subscribes|publishes} data.
"""
self.topic_url = kwargs.get('dmaap_info').get('topic_url')
self.aaf_id = aaf_creds.get('aaf_id')
self.aaf_pass = aaf_creds.get('aaf_pass')
+ self.ca_cert_path = ca_cert_path
+ self.enable_tls = enable_tls
+ self.cert_params = cert_params
class _MrPub(_DmaapMrClient):
- def __init__(self, pub_name, aaf_creds, **kwargs):
+ def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
self.pub_name = pub_name
- super().__init__(aaf_creds, **kwargs)
+ super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
@mdc_handler
def publish_to_topic(self, event_json, **kwargs):
Publish the event to the DMaaP Message Router topic.
Args:
- event_json: the json data to be published.
+ event_json (dict): the json data to be published.
Raises:
Exception: if post request fails.
'InvocationID': kwargs['invocation_id'],
'RequestID': kwargs['request_id']
}
- logger.info(f'Attempting to publish event to {self.topic_url}')
+ logger.info(f'Publishing event to {self.topic_url}')
response = session.post(self.topic_url, headers=headers,
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
- verify=False)
+ verify=(self.ca_cert_path if self.enable_tls else False))
response.raise_for_status()
except Exception as e:
- logger.error(f'Failed to publish message to MR topic: {e}')
- raise
+ raise e
- def publish_subscription_event_data(self, subscription, xnf_name, app_conf):
+ def publish_subscription_event_data(self, subscription, nf, app_conf):
"""
Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
Args:
- subscription: the `Subscription` <Subscription> object.
- xnf_name: the xnf to include in the event.
+ subscription (Subscription): the `Subscription` <Subscription> object.
+ nf (NetworkFunction): the NetworkFunction to include in the event.
app_conf (AppConfig): the application configuration.
"""
try:
- subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf)
+ subscription_event = subscription.prepare_subscription_event(nf, app_conf)
self.publish_to_topic(subscription_event)
except Exception as e:
- logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
- raise
+ logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True)
+ raise e
class _MrSub(_DmaapMrClient):
- def __init__(self, sub_name, aaf_creds, **kwargs):
+ def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
self.sub_name = sub_name
- super().__init__(aaf_creds, **kwargs)
+ super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
@mdc_handler
def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs):
Returns the json data from the MrTopic.
Args:
- consumer_id: Within your subscribers group, a name that uniquely
+ consumer_id (str): Within your subscribers group, a name that uniquely
identifies your subscribers process.
- consumer_group: A name that uniquely identifies your subscribers.
- timeout: The request timeout value in mSec.
+ consumer_group (str): A name that uniquely identifies your subscribers.
+ timeout (int): The request timeout value in mSec.
Returns:
list[str]: the json response from DMaaP Message Router topic.
headers = {'accept': 'application/json', 'content-type': 'application/json',
'InvocationID': kwargs['invocation_id'],
'RequestID': kwargs['request_id']}
- logger.debug(f'Fetching messages from MR topic: {self.topic_url}')
+ logger.info(f'Fetching messages from MR topic: {self.topic_url}')
response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
f'?timeout={timeout}',
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
- verify=False)
+ verify=(self.ca_cert_path if self.enable_tls else False))
+ if response.status_code == 503:
+ logger.error(f'MR Service is unavailable at present: {response.content}')
+ pass
response.raise_for_status()
if response.ok:
return response.json()
except Exception as e:
- logger.error(f'Failed to fetch message from MR: {e}')
+ logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
raise
def run(self):
self.function(*self.args, **self.kwargs)
while not self.finished.wait(self.interval):
- self.function(*self.args, **self.kwargs)
+ try:
+ self.function(*self.args, **self.kwargs)
+ except Exception as e:
+ logger.error(f'Exception in thread: {self.name}: {e}', exc_info=True)