[DCAEGEN-2] Added Junit for nfFilter schema validation
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / mod / pmsh_utils.py
index 354d6b8..a5fc86e 100755 (executable)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2019-2020 Nordix Foundation.
+#  Copyright (C) 2019-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 #
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
-import threading
+import json
+import os
 import uuid
+from json import JSONDecodeError
 from os import getenv
 from threading import Timer
 
@@ -25,9 +27,9 @@ from onap_dcae_cbs_docker_client.client import get_all
 from onaplogging.mdcContext import MDC
 from requests.auth import HTTPBasicAuth
 from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
+from jsonschema import validate, ValidationError
 
 from mod import logger
-from mod.network_function import NetworkFunctionFilter
 from mod.subscription import Subscription
 
 
@@ -42,42 +44,61 @@ def mdc_handler(function):
         kwargs['request_id'] = request_id
         kwargs['invocation_id'] = invocation_id
         return function(*args, **kwargs)
+
     return decorator
 
 
-class ThreadSafeSingleton(type):
-    _instances = {}
-    _singleton_lock = threading.Lock()
+class MySingleton(object):
+    instances = {}
+
+    def __new__(cls, clz=None):
+        if clz is None:
+            if cls.__name__ not in MySingleton.instances:
+                MySingleton.instances[cls.__name__] = \
+                    object.__new__(cls)
+            return MySingleton.instances[cls.__name__]
+        MySingleton.instances[clz.__name__] = clz()
+        MySingleton.first = clz
+        return type(clz.__name__, (MySingleton,), dict(clz.__dict__))
 
-    def __call__(cls, *args, **kwargs):
-        # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking)
-        if cls not in cls._instances:
-            with cls._singleton_lock:
-                if cls not in cls._instances:
-                    cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
-        return cls._instances[cls]
 
+def _load_sub_schema_from_file():
+    try:
+        with open(os.path.join(os.path.dirname(__file__), 'sub_schema.json')) as sub:
+            return json.load(sub)
+    except OSError as err:
+        logger.error(f'Failed to read sub schema file: {err}', exc_info=True)
+    except JSONDecodeError as json_err:
+        logger.error(f'sub schema file is not a valid JSON file: {json_err}', exc_info=True)
 
-class AppConfig(metaclass=ThreadSafeSingleton):
+
+class AppConfig:
+    INSTANCE = None
 
     def __init__(self):
-        try:
-            conf = self._get_pmsh_config()
-        except Exception:
-            raise
+        conf = self._get_pmsh_config()
         self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'),
                           'aaf_pass': conf['config'].get('aaf_password')}
+        self.enable_tls = conf['config'].get('enable_tls')
+        self.ca_cert_path = conf['config'].get('ca_cert_path')
         self.cert_path = conf['config'].get('cert_path')
         self.key_path = conf['config'].get('key_path')
         self.streams_subscribes = conf['config'].get('streams_subscribes')
         self.streams_publishes = conf['config'].get('streams_publishes')
         self.operational_policy_name = conf['config'].get('operational_policy_name')
         self.control_loop_name = conf['config'].get('control_loop_name')
+        self.sub_schema = _load_sub_schema_from_file()
         self.subscription = Subscription(**conf['policy']['subscription'])
-        self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter)
+        self.nf_filter = None
+
+    def __new__(cls, *args, **kwargs):
+        if AppConfig.INSTANCE is None:
+            AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs)
+        return AppConfig.INSTANCE
 
     @mdc_handler
-    @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
+    @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
+           retry=retry_if_exception_type(ValueError))
     def _get_pmsh_config(self, **kwargs):
         """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
         is received, it retries after 2 seconds for 5 times before raising an exception.
@@ -89,13 +110,27 @@ class AppConfig(metaclass=ThreadSafeSingleton):
             Exception: If any error occurred pulling configuration from Config binding service.
         """
         try:
-            logger.info('Fetching PMSH Configuration from CBS.')
+            logger.info('Attempting to fetch PMSH Configuration from CBS.')
             config = get_all()
             logger.info(f'Successfully fetched PMSH config from CBS: {config}')
             return config
-        except Exception as err:
-            logger.error(f'Failed to get config from CBS: {err}')
-            raise Exception
+        except Exception as e:
+            logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
+            raise ValueError(e)
+
+    def validate_sub_schema(self):
+        """
+        Validates schema of PMSH subscription
+
+        Raises:
+            ValidationError: If the PMSH subscription schema is invalid
+        """
+        sub_data = self.subscription.__dict__
+        validate(instance=sub_data, schema=self.sub_schema)
+        nf_filter = sub_data["nfFilter"]
+        if not [filter_name for filter_name, val in nf_filter.items() if len(val) > 0]:
+            raise ValidationError("At least one filter within nfFilter must not be empty")
+        logger.debug("Subscription schema is valid.")
 
     def refresh_config(self):
         """
@@ -106,14 +141,11 @@ class AppConfig(metaclass=ThreadSafeSingleton):
         """
         try:
             app_conf = self._get_pmsh_config()
+            self.subscription = Subscription(**app_conf['policy']['subscription'])
+            logger.info("AppConfig data has been refreshed")
         except Exception:
-            logger.debug("Failed to refresh AppConfig data")
+            logger.error('Failed to refresh PMSH AppConfig')
             raise
-        self.subscription.administrativeState = \
-            app_conf['policy']['subscription']['administrativeState']
-        self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames']
-        self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter']['swVersions']
-        logger.info("AppConfig data has been refreshed")
 
     def get_mr_sub(self, sub_name):
         """
@@ -129,9 +161,10 @@ class AppConfig(metaclass=ThreadSafeSingleton):
             KeyError: if the sub_name is not found.
         """
         try:
-            return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name])
+            return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path,
+                          self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name])
         except KeyError as e:
-            logger.debug(e)
+            logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True)
             raise
 
     def get_mr_pub(self, pub_name):
@@ -148,9 +181,10 @@ class AppConfig(metaclass=ThreadSafeSingleton):
             KeyError: if the sub_name is not found.
         """
         try:
-            return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name])
+            return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path,
+                          self.enable_tls, self.cert_params, **self.streams_publishes[pub_name])
         except KeyError as e:
-            logger.debug(e)
+            logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True)
             raise
 
     @property
@@ -159,29 +193,35 @@ class AppConfig(metaclass=ThreadSafeSingleton):
         Returns the tls artifact paths.
 
         Returns:
-            cert_path, key_path: the path to tls cert and key.
+            cert_path, key_path (tuple): the path to tls cert and key.
         """
         return self.cert_path, self.key_path
 
 
 class _DmaapMrClient:
-    def __init__(self, aaf_creds, **kwargs):
+    def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
         """
         A DMaaP Message Router utility class.
         Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
         Args:
-            aaf_creds: a dict of aaf secure credentials.
+            aaf_creds (dict): a dict of aaf secure credentials.
+            ca_cert_path (str): path to the ca certificate.
+            enable_tls (bool): TLS if True, else False
+            cert_params (tuple): client side (cert, key) tuple.
             **kwargs: a dict of streams_{subscribes|publishes} data.
         """
         self.topic_url = kwargs.get('dmaap_info').get('topic_url')
         self.aaf_id = aaf_creds.get('aaf_id')
         self.aaf_pass = aaf_creds.get('aaf_pass')
+        self.ca_cert_path = ca_cert_path
+        self.enable_tls = enable_tls
+        self.cert_params = cert_params
 
 
 class _MrPub(_DmaapMrClient):
-    def __init__(self, pub_name, aaf_creds, **kwargs):
+    def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
         self.pub_name = pub_name
-        super().__init__(aaf_creds, **kwargs)
+        super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
 
     @mdc_handler
     def publish_to_topic(self, event_json, **kwargs):
@@ -189,7 +229,7 @@ class _MrPub(_DmaapMrClient):
         Publish the event to the DMaaP Message Router topic.
 
         Args:
-            event_json: the json data to be published.
+            event_json (dict): the json data to be published.
 
         Raises:
             Exception: if post request fails.
@@ -200,36 +240,35 @@ class _MrPub(_DmaapMrClient):
                        'InvocationID': kwargs['invocation_id'],
                        'RequestID': kwargs['request_id']
                        }
-            logger.info(f'Attempting to publish event to {self.topic_url}')
+            logger.info(f'Publishing event to {self.topic_url}')
             response = session.post(self.topic_url, headers=headers,
                                     auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
-                                    verify=False)
+                                    verify=(self.ca_cert_path if self.enable_tls else False))
             response.raise_for_status()
         except Exception as e:
-            logger.error(f'Failed to publish message to MR topic: {e}')
-            raise
+            raise e
 
-    def publish_subscription_event_data(self, subscription, xnf_name, app_conf):
+    def publish_subscription_event_data(self, subscription, nf, app_conf):
         """
         Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
 
         Args:
-            subscription: the `Subscription` <Subscription> object.
-            xnf_name: the xnf to include in the event.
+            subscription (Subscription): the `Subscription` <Subscription> object.
+            nf (NetworkFunction): the NetworkFunction to include in the event.
             app_conf (AppConfig): the application configuration.
         """
         try:
-            subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf)
+            subscription_event = subscription.prepare_subscription_event(nf, app_conf)
             self.publish_to_topic(subscription_event)
         except Exception as e:
-            logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
-            raise
+            logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True)
+            raise e
 
 
 class _MrSub(_DmaapMrClient):
-    def __init__(self, sub_name, aaf_creds, **kwargs):
+    def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
         self.sub_name = sub_name
-        super().__init__(aaf_creds, **kwargs)
+        super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
 
     @mdc_handler
     def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs):
@@ -237,10 +276,10 @@ class _MrSub(_DmaapMrClient):
         Returns the json data from the MrTopic.
 
         Args:
-            consumer_id: Within your subscribers group, a name that uniquely
+            consumer_id (str): Within your subscribers group, a name that uniquely
             identifies your subscribers process.
-            consumer_group: A name that uniquely identifies your subscribers.
-            timeout: The request timeout value in mSec.
+            consumer_group (str): A name that uniquely identifies your subscribers.
+            timeout (int): The request timeout value in mSec.
 
         Returns:
             list[str]: the json response from DMaaP Message Router topic.
@@ -250,16 +289,19 @@ class _MrSub(_DmaapMrClient):
             headers = {'accept': 'application/json', 'content-type': 'application/json',
                        'InvocationID': kwargs['invocation_id'],
                        'RequestID': kwargs['request_id']}
-            logger.debug(f'Fetching messages from MR topic: {self.topic_url}')
+            logger.info(f'Fetching messages from MR topic: {self.topic_url}')
             response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
                                    f'?timeout={timeout}',
                                    auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
-                                   verify=False)
+                                   verify=(self.ca_cert_path if self.enable_tls else False))
+            if response.status_code == 503:
+                logger.error(f'MR Service is unavailable at present: {response.content}')
+                pass
             response.raise_for_status()
             if response.ok:
                 return response.json()
         except Exception as e:
-            logger.error(f'Failed to fetch message from MR: {e}')
+            logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
             raise
 
 
@@ -271,4 +313,7 @@ class PeriodicTask(Timer):
     def run(self):
         self.function(*self.args, **self.kwargs)
         while not self.finished.wait(self.interval):
-            self.function(*self.args, **self.kwargs)
+            try:
+                self.function(*self.args, **self.kwargs)
+            except Exception as e:
+                logger.error(f'Exception in thread: {self.name}: {e}', exc_info=True)