Refactor and bug fixes 15/110915/4
authorefiacor <fiachra.corcoran@est.tech>
Wed, 5 Aug 2020 09:12:04 +0000 (10:12 +0100)
committerefiacor <fiachra.corcoran@est.tech>
Thu, 13 Aug 2020 10:22:39 +0000 (11:22 +0100)
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: I8fe91bfdd2f1a2c8a6ca914e52d82dce04bffc0e
Issue-ID: DCAEGEN2-2146

18 files changed:
components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml [deleted file]
components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json
components/pm-subscription-handler/log_config.yaml
components/pm-subscription-handler/pmsh_service/mod/__init__.py
components/pm-subscription-handler/pmsh_service/mod/aai_client.py
components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
components/pm-subscription-handler/pmsh_service/mod/network_function.py
components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
components/pm-subscription-handler/pmsh_service/mod/subscription.py
components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
components/pm-subscription-handler/tests/test_aai_event_handler.py
components/pm-subscription-handler/tests/test_controller.py
components/pm-subscription-handler/tests/test_exit_handler.py
components/pm-subscription-handler/tests/test_network_function.py
components/pm-subscription-handler/tests/test_subscription.py
components/pm-subscription-handler/tests/test_subscription_handler.py

diff --git a/components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml b/components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml
deleted file mode 100755 (executable)
index e3f7987..0000000
+++ /dev/null
@@ -1,184 +0,0 @@
-#
-# ============LICENSE_START=======================================================
-#  Copyright (C) 2020 Nordix Foundation.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the 'License');
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an 'AS IS' BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# SPDX-License-Identifier: Apache-2.0
-# ============LICENSE_END=========================================================
-#
-
-tosca_definitions_version: cloudify_dsl_1_3
-
-imports:
-  - 'http://www.getcloudify.org/spec/cloudify/4.5.5/types.yaml'
-  - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R6/k8splugin/1.7.2/k8splugin_types.yaml'
-  - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.ccsdk.platform.plugins/type_files/dmaap/dmaap.yaml'
-  - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.ccsdk.platform.plugins/type_files/pgaas/1.1.0/pgaas_types.yaml'
-  - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R6/clamppolicyplugin/1.1.0/clamppolicyplugin_types.yaml'
-
-inputs:
-  tag_version:
-    type: string
-    description: Docker image to be used
-    default: 'nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.pmsh:latest'
-  replicas:
-    type: integer
-    description: Number of instances
-    default: 1
-  policy_model_id:
-    type: 'string'
-    default: 'onap.policies.monitoring.dcae-pm-subscription-handler'
-  policy_id:
-    type: 'string'
-    default: 'onap.policies.monitoring.dcae-pm-subscription-handler'
-  operational_policy_name:
-    type: string
-    default: 'pmsh-operational-policy'
-  control_loop_name:
-    type: string
-  pmsh_publish_topic_name:
-    type: string
-    default: 'unauthenticated.DCAE_CL_OUTPUT'
-  policy_feedback_topic_name:
-    type: string
-    default: 'PMSH_CL_INPUT'
-  aai_notification_topic_name:
-    type: string
-    default: 'AAI-EVENT'
-  publisher_client_role:
-    type: string
-    description: Client role to request secure access to topic
-    default: 'org.onap.dcae.pmPublisher'
-  subscriber_client_role:
-    type: string
-    description: Client role to request secure access to topic
-    default: 'org.onap.dcae.pmSubscriber'
-  client_id:
-    type: string
-    description: Client id for given AAF client
-    default: 'dcae@dcae.onap.org'
-  client_password:
-    type: string
-    description: Password for AAF client provided as client_id
-  dcae_location:
-    type: string
-    description: DCAE location for the subscriber, used to set up routing
-    default: 'san-francisco'
-  cpu_limit:
-    type: string
-    default: '1000m'
-  cpu_request:
-    type: string
-    default: '1000m'
-  memory_limit:
-    type: string
-    default: '1024Mi'
-  memory_request:
-    type: string
-    default: '1024Mi'
-  pgaas_cluster_name:
-    type: string
-    default: 'dcae-pg-primary.onap'
-node_templates:
-  pgaasvm:
-    type: dcae.nodes.pgaas.database
-    properties:
-      writerfqdn: { get_input: pgaas_cluster_name }
-      name: 'pmsh'
-  pm_subscribe_topic:
-    type: ccsdk.nodes.Topic
-    properties:
-      topic_name: { get_input: policy_feedback_topic_name }
-  pmsh:
-    type: dcae.nodes.ContainerizedServiceComponentUsingDmaap
-    interfaces:
-      cloudify.interfaces.lifecycle:
-        create:
-          inputs:
-            ports:
-              - '8443:0'
-            envs:
-              PMSH_PG_URL:
-                  { get_attribute: [ pgaasvm, admin, host ] }
-              PMSH_PG_PASSWORD:
-                  { get_attribute: [ pgaasvm, admin, password ] }
-              PMSH_PG_USERNAME:
-                  { get_attribute: [ pgaasvm, admin, user ] }
-              PMSH_DB_NAME:
-                  { get_attribute: [ pgaasvm, admin, database ] }
-
-    relationships:
-      - type: ccsdk.relationships.subscribe_to_events
-        target: pm_subscribe_topic
-      - type: cloudify.relationships.depends_on
-        target: pgaasvm
-      - type: cloudify.relationships.depends_on
-        target: pmsh-policy
-
-    properties:
-      service_component_type: 'dcae-pmsh'
-      service_component_name_override: 'dcae-pmsh'
-      application_config:
-        aaf_identity: { get_input: client_id }
-        aaf_password: { get_input: client_password }
-        operational_policy_name: { get_input: operational_policy_name }
-        control_loop_name: { get_input: control_loop_name }
-        cert_path: '/opt/app/pmsh/etc/certs/cert.pem'
-        key_path: '/opt/app/pmsh/etc/certs/key.pem'
-        ca_cert_path: '/opt/app/pmsh/etc/certs/cacert.pem'
-        streams_publishes:
-          policy_pm_publisher:
-            type: message_router
-            dmaap_info:
-              topic_url: {concat: ["https://message-router:3905/events/", { get_input: pmsh_publish_topic_name }]}
-        streams_subscribes:
-          policy_pm_subscriber:
-            type: message_router
-            dmaap_info: <<pm_subscribe_topic>>
-          aai_subscriber:
-            type: message_router
-            dmaap_info:
-              topic_url: {concat: ["https://message-router:3905/events/", { get_input: aai_notification_topic_name }]}
-      resource_config:
-        limits:
-          cpu: { get_input: cpu_limit }
-          memory: { get_input: memory_limit }
-        requests:
-          cpu: { get_input: cpu_request }
-          memory: { get_input: memory_request }
-      docker_config:
-        healthcheck:
-          endpoint: /healthcheck
-          interval: 15s
-          timeout: 1s
-          type: https
-      streams_subscribes:
-        - name: pm_subscribe_topic
-          location: { get_input: dcae_location }
-          client_role: { get_input: subscriber_client_role }
-          type: message-router
-      image: { get_input: tag_version }
-      replicas: { get_input: replicas }
-      log_info:
-        log_directory: '/var/log/ONAP/dcaegen2/services/pmsh'
-      tls_info:
-        cert_directory: '/opt/app/pmsh/etc/certs'
-        use_tls: true
-  pmsh-policy:
-    type: clamp.nodes.policy
-    properties:
-      policy_model_id:
-        get_input: policy_model_id
-      policy_id:
-        get_input: policy_id
\ No newline at end of file
index 4302885..69513bc 100755 (executable)
       "policy_editable": false,
       "designer_editable": false
     },
+    {
+      "name": "enable_tls",
+      "value": true,
+      "description": "Boolean to (en|dis)able TLS",
+      "sourced_at_deployment": false,
+      "policy_editable": false,
+      "designer_editable": true,
+      "type": "boolean"
+    },
+    {
+      "name": "protocol",
+      "value": "https",
+      "type": "string",
+      "description": "Protocol PMSH api will use. If enable_tls is disabled, set protocol to http",
+      "sourced_at_deployment": false,
+      "policy_editable": false,
+      "designer_editable": true
+    },
     {
       "name": "policy_model_id",
       "value": "onap.policies.monitoring.dcae-pm-initiation-handler",
index 971c188..b2d8f43 100755 (executable)
@@ -1,27 +1,27 @@
-version: 1\r
-\r
-disable_existing_loggers: true\r
-\r
-loggers:\r
-  onap_logger:\r
-    level: INFO\r
-    handlers: [onap_log_handler, stdout_handler]\r
-    propagate: false\r
-handlers:\r
-  onap_log_handler:\r
-    class: logging.handlers.RotatingFileHandler\r
-    filename: /var/log/ONAP/dcaegen2/services/pmsh/application.log\r
-    mode: a\r
-    maxBytes: 10000000\r
-    backupCount: 10\r
-    formatter: mdcFormatter\r
-  stdout_handler:\r
-    class: logging.StreamHandler\r
-    formatter: mdcFormatter\r
-formatters:\r
-  mdcFormatter:\r
-    format: '%(asctime)s | %(threadName)s | %(thread)d | %(levelname)s | %(module)s\r
-      | %(funcName)s | %(mdc)s | %(message)s'\r
-    mdcfmt: '{ServiceName} | {RequestID} | {InvocationID}'\r
-    datefmt: '%Y-%m-%dT%H:%M:%S%z'\r
-    (): onaplogging.mdcformatter.MDCFormatter\r
+version: 1
+
+disable_existing_loggers: true
+
+loggers:
+  onap_logger:
+    level: INFO
+    handlers: [onap_log_handler, stdout_handler]
+    propagate: false
+handlers:
+  onap_log_handler:
+    class: logging.handlers.RotatingFileHandler
+    filename: /var/log/ONAP/dcaegen2/services/pmsh/application.log
+    mode: a
+    maxBytes: 10000000
+    backupCount: 10
+    formatter: mdcFormatter
+  stdout_handler:
+    class: logging.StreamHandler
+    formatter: mdcFormatter
+formatters:
+  mdcFormatter:
+    format: '%(asctime)s | %(threadName)s | %(thread)d | %(levelname)s | %(module)s
+      | %(funcName)s | %(mdc)s | %(message)s'
+    mdcfmt: '{ServiceName} | {RequestID} | {InvocationID}'
+    datefmt: '%Y-%m-%dT%H:%M:%S%z'
+    (): onaplogging.mdcformatter.MDCFormatter
index 4c86ccd..58cd8b3 100644 (file)
@@ -43,9 +43,11 @@ def launch_api_server(app_config):
     connex_app = _get_app()
     connex_app.add_api('api/pmsh_swagger.yml')
     if app_config.enable_tls:
+        logger.info('Launching secure http API server')
         connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'),
                        ssl_context=app_config.cert_params)
     else:
+        logger.info('Launching unsecure http API server')
         connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'))
 
 
index bd05274..5a3c543 100755 (executable)
@@ -114,11 +114,11 @@ def _filter_nf_data(nf_data, nf_filter):
     Returns a list of filtered NetworkFunctions using the nf_filter.
 
     Args:
-        nf_data : the nf json data from AAI.
-        nf_filter: the `NetworkFunctionFilter <NetworkFunctionFilter>` to be applied.
+        nf_data(dict): the nf json data from AAI.
+        nf_filter(NetworkFunctionFilter): the NetworkFunctionFilter to be applied.
 
     Returns:
-        set: a set of filtered NetworkFunctions.
+        set(NetworkFunctions): a set of filtered NetworkFunctions.
 
     Raises:
         KeyError: if AAI data cannot be parsed.
index f1e8cf2..60b6960 100755 (executable)
@@ -72,8 +72,7 @@ def _process_event(action, new_status, xnf_name, mr_pub, app_conf):
         local_xnf = NetworkFunction.get(xnf_name)
 
         if local_xnf is None:
-            logger.info(f'Activating subscription for network function {xnf_name}')
-            app_conf.subscription.process_subscription([NetworkFunction(
+            app_conf.subscription.activate_subscription([NetworkFunction(
                 nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf)
         else:
             logger.debug(f"Update Event for network function {xnf_name} will not be processed "
index 3d02375..1293296 100755 (executable)
@@ -25,28 +25,28 @@ class ExitHandler:
 
     Args:
         periodic_tasks (List[PeriodicTask]): PeriodicTasks that needs to be cancelled.
+        app_conf (AppConfig): The PMSH Application Configuration.
         subscription_handler (SubscriptionHandler): The subscription handler instance.
     """
 
     shutdown_signal_received = False
 
-    def __init__(self, *, periodic_tasks, subscription_handler):
+    def __init__(self, *, periodic_tasks, app_conf, subscription_handler):
         self.periodic_tasks = periodic_tasks
+        self.app_conf = app_conf
         self.subscription_handler = subscription_handler
 
     def __call__(self, sig_num, frame):
         logger.info('Graceful shutdown of PMSH initiated.')
         logger.debug(f'ExitHandler was called with signal number: {sig_num}.')
-        current_sub = self.subscription_handler.current_sub
-        if current_sub and current_sub.administrativeState == AdministrativeState.UNLOCKED.value:
+        current_sub = self.app_conf.subscription
+        if current_sub.administrativeState == AdministrativeState.UNLOCKED.value:
             try:
+                current_sub.deactivate_subscription(self.subscription_handler.mr_pub, self.app_conf)
+                current_sub.update_subscription_status()
                 for thread in self.periodic_tasks:
                     logger.debug(f'Cancelling periodic task with thread name: {thread.name}.')
                     thread.cancel()
-                current_sub.administrativeState = AdministrativeState.LOCKED.value
-                current_sub.process_subscription(current_sub.get_network_functions(),
-                                                 self.subscription_handler.mr_pub,
-                                                 self.subscription_handler.app_conf)
             except Exception as e:
                 logger.error(f'Failed to shut down PMSH application: {e}', exc_info=True)
         ExitHandler.shutdown_signal_received = True
index 979cc77..191e951 100755 (executable)
@@ -17,7 +17,6 @@
 # ============LICENSE_END=====================================================
 
 import re
-from enum import Enum
 
 from mod import logger, db
 from mod.api.db_models import NetworkFunctionModel
@@ -55,7 +54,6 @@ class NetworkFunction:
             db.session.commit()
             logger.info(f'Network Function {new_nf.nf_name} successfully created.')
             return new_nf
-
         else:
             logger.debug(f'Network function {existing_nf.nf_name} already exists,'
                          f' returning this network function..')
@@ -109,9 +107,4 @@ class NetworkFunctionFilter:
             bool: True if matched, else False.
         """
         return self.regex_matcher.search(nf_name) and \
-            orchestration_status == OrchestrationStatus.ACTIVE.value
-
-
-class OrchestrationStatus(Enum):
-    ACTIVE = 'Active'
-    INVENTORIED = 'Inventoried'
+            orchestration_status == 'Active'
index 50eb122..872843d 100755 (executable)
@@ -15,7 +15,6 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
-import threading
 import uuid
 from os import getenv
 from threading import Timer
@@ -45,20 +44,22 @@ def mdc_handler(function):
     return decorator
 
 
-class ThreadSafeSingleton(type):
-    _instances = {}
-    _singleton_lock = threading.Lock()
+class MySingleton(object):
+    instances = {}
 
-    def __call__(cls, *args, **kwargs):
-        # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking)
-        if cls not in cls._instances:
-            with cls._singleton_lock:
-                if cls not in cls._instances:
-                    cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
-        return cls._instances[cls]
+    def __new__(cls, clz=None):
+        if clz is None:
+            if cls.__name__ not in MySingleton.instances:
+                MySingleton.instances[cls.__name__] = \
+                    object.__new__(cls)
+            return MySingleton.instances[cls.__name__]
+        MySingleton.instances[clz.__name__] = clz()
+        MySingleton.first = clz
+        return type(clz.__name__, (MySingleton,), dict(clz.__dict__))
 
 
-class AppConfig(metaclass=ThreadSafeSingleton):
+class AppConfig:
+    INSTANCE = None
 
     def __init__(self):
         try:
@@ -78,6 +79,11 @@ class AppConfig(metaclass=ThreadSafeSingleton):
         self.subscription = Subscription(**conf['policy']['subscription'])
         self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter)
 
+    def __new__(cls, *args, **kwargs):
+        if AppConfig.INSTANCE is None:
+            AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs)
+        return AppConfig.INSTANCE
+
     @mdc_handler
     @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
     def _get_pmsh_config(self, **kwargs):
@@ -272,7 +278,7 @@ class _MrSub(_DmaapMrClient):
             if response.ok:
                 return response.json()
         except Exception as e:
-            logger.error(f'Failed to fetch message from MR: {e}')
+            logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
             raise
 
 
index dbcd7a5..97bfc40 100755 (executable)
@@ -33,6 +33,7 @@ class SubNfState(Enum):
 class AdministrativeState(Enum):
     UNLOCKED = 'UNLOCKED'
     LOCKED = 'LOCKED'
+    PENDING = 'PENDING'
 
 
 subscription_nf_states = {
@@ -55,28 +56,7 @@ class Subscription:
         self.fileLocation = kwargs.get('fileLocation')
         self.nfFilter = kwargs.get('nfFilter')
         self.measurementGroups = kwargs.get('measurementGroups')
-
-    def prepare_subscription_event(self, xnf_name, app_conf):
-        """Prepare the sub event for publishing
-
-        Args:
-            xnf_name: the AAI xnf name.
-            app_conf (AppConfig): the application configuration.
-
-        Returns:
-            dict: the Subscription event to be published.
-        """
-        try:
-            clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
-            sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
-                         'changeType': 'DELETE'
-                         if self.administrativeState == AdministrativeState.LOCKED.value
-                         else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
-                         'subscription': clean_sub}
-            return sub_event
-        except Exception as e:
-            logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True)
-            raise
+        self.create()
 
     def create(self):
         """ Creates a subscription database entry
@@ -89,7 +69,7 @@ class Subscription:
                 SubscriptionModel.subscription_name == self.subscriptionName).one_or_none())
             if existing_subscription is None:
                 new_subscription = SubscriptionModel(subscription_name=self.subscriptionName,
-                                                     status=self.administrativeState)
+                                                     status=AdministrativeState.PENDING.value)
                 db.session.add(new_subscription)
                 db.session.commit()
                 return new_subscription
@@ -101,54 +81,111 @@ class Subscription:
             logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}',
                          exc_info=True)
 
-    def add_network_function_to_subscription(self, nf):
+    def update_subscription_status(self):
+        """ Updates the status of subscription in subscription table """
+        try:
+            SubscriptionModel.query.filter(
+                SubscriptionModel.subscription_name == self.subscriptionName)\
+                .update({SubscriptionModel.status: self.administrativeState},
+                        synchronize_session='evaluate')
+
+            db.session.commit()
+        except Exception as e:
+            logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}',
+                         exc_info=True)
+
+    def delete_subscription(self):
+        """ Deletes a subscription and all its association from the database. A network function
+        that is only associated with the subscription being removed will also be deleted."""
+        try:
+            subscription = SubscriptionModel.query.filter(
+                SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+            if subscription:
+                for nf_relationship in subscription.nfs:
+                    other_nf_relationship = NfSubRelationalModel.query.filter(
+                        NfSubRelationalModel.subscription_name != self.subscriptionName,
+                        NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none()
+                    if not other_nf_relationship:
+                        db.session.delete(nf_relationship.nf)
+                db.session.delete(subscription)
+                db.session.commit()
+        except Exception as e:
+            logger.error(f'Failed to delete subscription: {self.subscriptionName} '
+                         f'and it\'s relations from the DB: {e}', exc_info=True)
+
+    def prepare_subscription_event(self, xnf_name, app_conf):
+        """Prepare the sub event for publishing
+
+        Args:
+            xnf_name: the AAI xnf name.
+            app_conf (AppConfig): the application configuration.
+
+        Returns:
+            dict: the Subscription event to be published.
+        """
+        try:
+            clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
+            sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
+                         'changeType': 'DELETE'
+                         if self.administrativeState == AdministrativeState.LOCKED.value
+                         else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
+                         'subscription': clean_sub}
+            return sub_event
+        except Exception as e:
+            logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True)
+            raise
+
+    def add_network_function_to_subscription(self, nf, sub_model):
         """ Associates a network function to a Subscription
 
         Args:
-            nf : A NetworkFunction object.
+            sub_model(SubscriptionModel): The SubscriptionModel from the DB.
+            nf(NetworkFunction): A NetworkFunction object.
         """
-        current_sub = self.create()
         try:
             current_nf = nf.create()
-            logger.debug(f'Adding network function {nf.nf_name} to Subscription '
-                         f'{current_sub.subscription_name}')
             existing_entry = NfSubRelationalModel.query.filter(
-                NfSubRelationalModel.subscription_name == current_sub.subscription_name,
+                NfSubRelationalModel.subscription_name == self.subscriptionName,
                 NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none()
             if existing_entry is None:
-                new_nf_sub = NfSubRelationalModel(current_sub.subscription_name,
+                new_nf_sub = NfSubRelationalModel(self.subscriptionName,
                                                   nf.nf_name, SubNfState.PENDING_CREATE.value)
-                new_nf_sub.nf = current_nf
-                current_sub.nfs.append(new_nf_sub)
-                logger.debug(f'Network function {current_nf.nf_name} added to Subscription '
-                             f'{current_sub.subscription_name}')
-                db.session.add(current_sub)
+                sub_model.nfs.append(new_nf_sub)
+                db.session.add(sub_model)
                 db.session.commit()
+                logger.info(f'Network function {current_nf.nf_name} added to Subscription '
+                            f'{self.subscriptionName}')
         except Exception as e:
             logger.error(f'Failed to add nf {nf.nf_name} to subscription '
-                         f'{current_sub.subscription_name}: {e}', exc_info=True)
-            logger.debug(f'Subscription {current_sub.subscription_name} now contains these XNFs:'
-                         f'{Subscription.get_nf_names_per_sub(current_sub.subscription_name)}')
+                         f'{self.subscriptionName}: {e}', exc_info=True)
+            logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:'
+                         f'{Subscription.get_nf_names_per_sub(self.subscriptionName)}')
 
-    @staticmethod
-    def get(subscription_name):
-        """ Retrieves a subscription
-
-        Args:
-            subscription_name (str): The subscription name
+    def get(self):
+        """ Retrieves a SubscriptionModel object
 
         Returns:
-            Subscription object else None
+            SubscriptionModel object else None
         """
         return SubscriptionModel.query.filter(
-            SubscriptionModel.subscription_name == subscription_name).one_or_none()
+            SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+
+    def get_local_sub_admin_state(self):
+        """ Retrieves the subscription admin state
+
+        Returns:
+            str: The admin state of the SubscriptionModel
+        """
+        sub_model = SubscriptionModel.query.filter(
+            SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+        return sub_model.status
 
     @staticmethod
     def get_all():
         """ Retrieves a list of subscriptions
 
         Returns:
-            list: Subscription list else empty
+            list(SubscriptionModel): Subscriptions list else empty
         """
         return SubscriptionModel.query.all()
 
@@ -160,7 +197,7 @@ class Subscription:
             subscription_name (str): The subscription name
 
         Returns:
-            list: List of network function names
+            list(str): List of network function names
         """
         nf_sub_rel = NfSubRelationalModel.query.filter(
             NfSubRelationalModel.subscription_name == subscription_name).all()
@@ -170,65 +207,41 @@ class Subscription:
 
         return list_of_nfs
 
-    def update_subscription_status(self):
-        """ Updates the status of subscription in subscription table """
-        try:
-            SubscriptionModel.query.filter(
-                SubscriptionModel.subscription_name == self.subscriptionName)\
-                .update({SubscriptionModel.status: self.administrativeState},
-                        synchronize_session='evaluate')
-
-            db.session.commit()
-        except Exception as e:
-            logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}',
-                         exc_info=True)
-
-    def delete_subscription(self):
-        """ Deletes a subscription and all its association from the database. A network function
-        that is only associated with the subscription being removed will also be deleted."""
-        try:
-            subscription = SubscriptionModel.query.filter(
-                SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
-            if subscription:
-                for nf_relationship in subscription.nfs:
-                    other_nf_relationship = NfSubRelationalModel.query.filter(
-                        NfSubRelationalModel.subscription_name != self.subscriptionName,
-                        NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none()
-                    if not other_nf_relationship:
-                        db.session.delete(nf_relationship.nf)
-                db.session.delete(subscription)
-                db.session.commit()
-        except Exception as e:
-            logger.error(f'Failed to delete subscription: {self.subscriptionName} '
-                         f'and it\'s relations from the DB: {e}', exc_info=True)
-
-    def process_subscription(self, nfs, mr_pub, app_conf):
-        action = 'Deactivate'
-        sub_nf_state = SubNfState.PENDING_DELETE.value
-        self.update_subscription_status()
-
-        if self.administrativeState == AdministrativeState.UNLOCKED.value:
-            action = 'Activate'
-            sub_nf_state = SubNfState.PENDING_CREATE.value
-            logger.info(f'{action} subscription initiated for {self.subscriptionName}.')
-
+    def activate_subscription(self, nfs, mr_pub, app_conf):
+        logger.info(f'Activate subscription initiated for {self.subscriptionName}.')
         try:
+            sub_model = self.get()
             for nf in nfs:
                 mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
-                logger.debug(f'Publishing Event to {action} '
-                             f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
-                if action == 'Activate':
-                    self.add_network_function_to_subscription(nf)
-                self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name)
+                logger.info(f'Publishing event to activate '
+                            f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+                self.add_network_function_to_subscription(nf, sub_model)
+                self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value,
+                                          nf.nf_name)
         except Exception as err:
             raise Exception(f'Error publishing activation event to MR: {err}')
 
+    def deactivate_subscription(self, mr_pub, app_conf):
+        nfs = self.get_network_functions()
+        try:
+            if nfs:
+                logger.info(f'Deactivate subscription initiated for {self.subscriptionName}.')
+                for nf in nfs:
+                    mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
+                    logger.debug(f'Publishing Event to deactivate '
+                                 f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+                    self.update_sub_nf_status(self.subscriptionName,
+                                              SubNfState.PENDING_DELETE.value,
+                                              nf.nf_name)
+        except Exception as err:
+            raise Exception(f'Error publishing deactivation event to MR: {err}')
+
     @staticmethod
     def get_all_nfs_subscription_relations():
         """ Retrieves all network function to subscription relations
 
         Returns:
-            list: NetworkFunctions per Subscription list else empty
+            list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty
         """
         nf_per_subscriptions = NfSubRelationalModel.query.all()
         return nf_per_subscriptions
index 74b6ac8..e74a173 100644 (file)
@@ -22,14 +22,12 @@ from mod.subscription import AdministrativeState
 
 
 class SubscriptionHandler:
-    def __init__(self, administrative_state, mr_pub, app, app_conf, aai_event_thread):
-        self.current_nfs = None
-        self.current_sub = None
-        self.administrative_state = administrative_state
+    def __init__(self, mr_pub, app, app_conf, aai_event_thread, policy_event_thread):
         self.mr_pub = mr_pub
         self.app = app
         self.app_conf = app_conf
         self.aai_event_thread = aai_event_thread
+        self.policy_event_thread = policy_event_thread
 
     def execute(self):
         """
@@ -37,25 +35,37 @@ class SubscriptionHandler:
         the Subscription if a change has occurred
         """
         self.app.app_context().push()
+        local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
         new_administrative_state = self.app_conf.subscription.administrativeState
         try:
-            if self.administrative_state == new_administrative_state:
+            if local_admin_state == new_administrative_state:
                 logger.info('Administrative State did not change in the Config')
             else:
-                self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf)
-                self.current_sub = self.app_conf.subscription
-                logger.info(f'Administrative State has changed from {self.administrative_state} '
-                            f'to {new_administrative_state}.')
-                self.administrative_state = new_administrative_state
-                self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf)
-
                 if new_administrative_state == AdministrativeState.UNLOCKED.value:
-                    logger.info('Listening to AAI-EVENT topic in MR.')
-                    self.aai_event_thread.start()
+                    self._activate(local_admin_state, new_administrative_state)
+                elif local_admin_state == AdministrativeState.PENDING.value:
+                    logger.info('Administrative State is PENDING')
                 else:
-                    logger.info('Stop listening to AAI-EVENT topic in MR.')
-                    self.aai_event_thread.cancel()
-
+                    self._deactivate(local_admin_state, new_administrative_state)
         except Exception as err:
             logger.error(f'Error occurred during the activation/deactivation process {err}',
                          exc_info=True)
+
+    def _activate(self, local_admin_state, new_administrative_state):
+        logger.info(f'Administrative State has changed from {local_admin_state} '
+                    f'to {new_administrative_state}.')
+        existing_nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf)
+        self.app_conf.subscription.activate_subscription(existing_nfs_in_aai, self.mr_pub,
+                                                         self.app_conf)
+        self.app_conf.subscription.update_subscription_status()
+        logger.info('Start listening for new NFs on AAI-EVENT topic in MR.')
+        self.aai_event_thread.start()
+        self.policy_event_thread.start()
+
+    def _deactivate(self, local_admin_state, new_administrative_state):
+        logger.info(f'Administrative State has changed from {local_admin_state} '
+                    f'to {new_administrative_state}.')
+        self.aai_event_thread.cancel()
+        logger.info('Stop listening for NFs on AAI-EVENT topic in MR.')
+        self.app_conf.subscription.deactivate_subscription(self.mr_pub, self.app_conf)
+        self.app_conf.subscription.update_subscription_status()
index b3c906d..6b6b9ba 100755 (executable)
@@ -23,7 +23,6 @@ from mod.aai_event_handler import process_aai_events
 from mod.exit_handler import ExitHandler
 from mod.pmsh_utils import AppConfig, PeriodicTask
 from mod.policy_response_handler import PolicyResponseHandler
-from mod.subscription import Subscription, AdministrativeState
 from mod.subscription_handler import SubscriptionHandler
 
 
@@ -40,27 +39,27 @@ def main():
         except Exception as e:
             logger.error(f'Failed to get config and create application: {e}', exc_info=True)
             sys.exit(e)
-        subscription_in_db = Subscription.get(app_conf.subscription.subscriptionName)
-        administrative_state = subscription_in_db.status if subscription_in_db \
-            else AdministrativeState.LOCKED.value
 
         app_conf_thread = PeriodicTask(10, app_conf.refresh_config)
         app_conf_thread.start()
-        aai_event_thread = PeriodicTask(10, process_aai_events,
-                                        args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
-        subscription_handler = SubscriptionHandler(administrative_state,
-                                                   policy_mr_pub, app, app_conf, aai_event_thread)
+
         policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app)
+        policy_response_handler_thread = PeriodicTask(25, policy_response_handler.poll_policy_topic)
+
+        aai_event_thread = PeriodicTask(20, process_aai_events,
+                                        args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
+
+        subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread,
+                                                   policy_response_handler_thread)
 
         subscription_handler_thread = PeriodicTask(30, subscription_handler.execute)
-        policy_response_handler_thread = PeriodicTask(10, policy_response_handler.poll_policy_topic)
         subscription_handler_thread.start()
-        policy_response_handler_thread.start()
+
         periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread,
                           policy_response_handler_thread]
 
         signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,
-                                    subscription_handler=subscription_handler))
+                                    app_conf=app_conf, subscription_handler=subscription_handler))
         launch_api_server(app_conf)
 
     except Exception as e:
index d366dac..9ac7647 100755 (executable)
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
 import json
+import os
 from os import path
+from test.support import EnvironmentVarGuard
 from unittest import TestCase
 from unittest.mock import patch, Mock
 
+from mod import create_app, db
 from mod.aai_event_handler import process_aai_events
-from mod.network_function import NetworkFunction, OrchestrationStatus
+from mod.network_function import NetworkFunction
 from mod.pmsh_utils import AppConfig
 
 
 class AAIEventHandlerTest(TestCase):
 
+    @patch('mod.get_db_connection_url')
+    @patch('mod.update_logging_config')
     @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
-    def setUp(self, mock_get_pmsh_config):
+    def setUp(self, mock_get_pmsh_config, mock_update_config, mock_get_db_url):
+        mock_get_db_url.return_value = 'sqlite://'
         with open(path.join(path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
             self.cbs_data = json.load(data)
         mock_get_pmsh_config.return_value = self.cbs_data
         self.app_conf = AppConfig()
         with open(path.join(path.dirname(__file__), 'data/mr_aai_events.json'), 'r') as data:
             self.mr_aai_events = json.load(data)["mr_response"]
+        self.env = EnvironmentVarGuard()
+        self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
         self.mock_mr_sub = Mock(get_from_topic=Mock(return_value=self.mr_aai_events))
         self.mock_mr_pub = Mock()
         self.mock_app = Mock()
+        self.app = create_app()
+        self.app_context = self.app.app_context()
+        self.app_context.push()
+        db.create_all()
 
-    @patch('mod.subscription.Subscription.process_subscription')
+    def tearDown(self):
+        db.session.remove()
+        db.drop_all()
+        self.app_context.pop()
+
+    @patch('mod.subscription.Subscription.activate_subscription')
     @patch('mod.aai_event_handler.NetworkFunction.delete')
     @patch('mod.aai_event_handler.NetworkFunction.get')
     def test_process_aai_update_and_delete_events(self, mock_nf_get, mock_nf_delete,
-                                                  mock_process_sub):
+                                                  mock_activate_sub):
         pnf_already_active = NetworkFunction(nf_name='pnf_already_active',
-                                             orchestration_status=OrchestrationStatus.ACTIVE.value)
+                                             orchestration_status='Active')
         mock_nf_get.side_effect = [None, pnf_already_active]
         expected_nf_for_processing = NetworkFunction(
-            nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value)
+            nf_name='pnf_newly_discovered', orchestration_status='Active')
 
         process_aai_events(self.mock_mr_sub, self.mock_mr_pub, self.mock_app, self.app_conf)
 
-        mock_process_sub.assert_called_once_with([expected_nf_for_processing],
-                                                 self.mock_mr_pub, self.app_conf)
+        mock_activate_sub.assert_called_once_with([expected_nf_for_processing],
+                                                  self.mock_mr_pub, self.app_conf)
         mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted')
index d324a07..4fcecc3 100755 (executable)
@@ -47,27 +47,26 @@ class ControllerTestCase(unittest.TestCase):
         with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
             self.cbs_data = json.load(data)
         mock_get_pmsh_config.return_value = self.cbs_data
-        self.app_conf = AppConfig()
-        self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
         self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
         self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
         self.app = create_app()
         self.app_context = self.app.app_context()
         self.app_context.push()
         db.create_all()
+        self.app_conf = AppConfig()
+        self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
 
     def tearDown(self):
         db.session.remove()
         db.drop_all()
-        self.app_context.pop()
 
     def test_status_response_healthy(self):
         self.assertEqual(status()['status'], 'healthy')
 
     def test_get_all_sub_to_nf_relations(self):
-        self.app_conf.subscription.create()
+        sub_model = self.app_conf.subscription.get()
         for nf in [self.nf_1, self.nf_2]:
-            self.app_conf.subscription.add_network_function_to_subscription(nf)
+            self.app_conf.subscription.add_network_function_to_subscription(nf, sub_model)
         all_subs = get_all_sub_to_nf_relations()
         self.assertEqual(len(all_subs[0]['network_functions']), 2)
         self.assertEqual(all_subs[0]['subscription_name'], 'ExtraPM-All-gNB-R2B')
index ac1e15c..d41bd03 100755 (executable)
 #
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
+import json
 import os
-import signal
-import threading
-import time
+from signal import SIGTERM, signal
+from test.support import EnvironmentVarGuard
 from unittest import TestCase
-from unittest.mock import patch, Mock, MagicMock
+from unittest.mock import patch, Mock
 
-import pmsh_service_main
+from mod.api.db_models import NetworkFunctionModel
 from mod.exit_handler import ExitHandler
-from mod.pmsh_utils import PeriodicTask
-from mod.subscription import AdministrativeState
+from mod.pmsh_utils import AppConfig
+from mod.subscription import Subscription
 
 
 class ExitHandlerTests(TestCase):
-
-    @patch('pmsh_service_main.create_app')
-    @patch('pmsh_service_main.db')
-    @patch('pmsh_service_main.AppConfig')
-    @patch('pmsh_service_main.Subscription')
-    @patch('pmsh_service_main.launch_api_server')
-    @patch('pmsh_service_main.SubscriptionHandler')
-    @patch.object(PeriodicTask, 'start')
-    @patch.object(PeriodicTask, 'cancel')
-    def test_terminate_signal_success(self, mock_task_cancel, mock_task_start, mock_sub_handler,
-                                      mock_launch_api_server, mock_sub, mock_app_conf,
-                                      mock_db, mock_app):
-        pid = os.getpid()
-        mock_db.get_app.return_value = Mock()
-
-        mock_sub.administrativeState = AdministrativeState.UNLOCKED.value
-        mock_sub.process_subscription = Mock()
-        mock_sub_handler_instance = MagicMock(execute=Mock(), current_sub=mock_sub)
-        mock_sub_handler.side_effect = [mock_sub_handler_instance]
-
-        def mock_api_server_run(param):
-            while mock_sub.administrativeState == AdministrativeState.UNLOCKED.value:
-                time.sleep(1)
-
-        mock_launch_api_server.side_effect = mock_api_server_run
-
-        def trigger_signal():
-            time.sleep(1)
-            os.kill(pid, signal.SIGTERM)
-
-        thread = threading.Thread(target=trigger_signal)
-        thread.start()
-
-        pmsh_service_main.main()
-
-        self.assertEqual(4, mock_task_cancel.call_count)
+    @patch('mod.subscription.Subscription.create')
+    @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+    @patch('mod.pmsh_utils.PeriodicTask')
+    def setUp(self, mock_periodic_task, mock_get_pmsh_config, mock_sub_create):
+        self.env = EnvironmentVarGuard()
+        self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
+        with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+            self.cbs_data = json.load(data)
+        mock_get_pmsh_config.return_value = self.cbs_data
+        self.mock_aai_event_thread = mock_periodic_task
+        self.app_conf = AppConfig()
+        self.sub = self.app_conf.subscription
+
+    @patch('mod.logger.debug')
+    @patch.object(Subscription, 'update_sub_nf_status')
+    @patch.object(Subscription, 'update_subscription_status')
+    @patch.object(Subscription, '_get_nf_models',
+                  return_value=[NetworkFunctionModel('pnf1', 'ACTIVE')])
+    def test_terminate_signal_successful(self, mock_sub_get_nf_models, mock_upd_sub_status,
+                                         mock_upd_subnf_status, mock_logger):
+        handler = ExitHandler(periodic_tasks=[self.mock_aai_event_thread],
+                              app_conf=self.app_conf,
+                              subscription_handler=Mock())
+        signal(SIGTERM, handler)
+        os.kill(os.getpid(), SIGTERM)
         self.assertTrue(ExitHandler.shutdown_signal_received)
-        self.assertEqual(1, mock_sub.process_subscription.call_count)
-        self.assertEqual(mock_sub.administrativeState, AdministrativeState.LOCKED.value)
index 86baef8..cdfb1eb 100755 (executable)
@@ -15,6 +15,7 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
+import json
 import os
 from test.support import EnvironmentVarGuard
 from unittest import TestCase
@@ -22,23 +23,29 @@ from unittest.mock import patch
 
 from mod import db, create_app
 from mod.network_function import NetworkFunction
+from mod.pmsh_utils import AppConfig
 from mod.subscription import Subscription
 
 
 class NetworkFunctionTests(TestCase):
 
+    @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
     @patch('mod.update_logging_config')
     @patch('mod.get_db_connection_url')
-    def setUp(self, mock_get_db_url, mock_update_config):
+    def setUp(self, mock_get_db_url, mock_update_config, mock_get_pmsh_config):
         mock_get_db_url.return_value = 'sqlite://'
         self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
         self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
+        with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+            self.cbs_data = json.load(data)
+        mock_get_pmsh_config.return_value = self.cbs_data
         self.env = EnvironmentVarGuard()
         self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
         self.app = create_app()
         self.app_context = self.app.app_context()
         self.app_context.push()
         db.create_all()
+        self.app_conf = AppConfig()
 
     def tearDown(self):
         db.session.remove()
@@ -74,7 +81,7 @@ class NetworkFunctionTests(TestCase):
         self.nf_2.create()
         sub = Subscription(**{"subscriptionName": "sub"})
         for nf in [self.nf_1, self.nf_2]:
-            sub.add_network_function_to_subscription(nf)
+            sub.add_network_function_to_subscription(nf, self.app_conf.subscription.get())
 
         NetworkFunction.delete(nf_name=self.nf_1.nf_name)
 
index 74593a4..27a189c 100755 (executable)
@@ -26,7 +26,7 @@ from requests import Session
 import mod.aai_client as aai_client
 from mod import db, create_app
 from mod.api.db_models import NetworkFunctionModel
-from mod.network_function import NetworkFunction, OrchestrationStatus
+from mod.network_function import NetworkFunction
 from mod.pmsh_utils import AppConfig
 from mod.subscription import Subscription
 
@@ -51,27 +51,28 @@ class SubscriptionTest(TestCase):
         with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
             self.cbs_data = json.load(data)
         mock_get_pmsh_config.return_value = self.cbs_data
-        self.app_conf = AppConfig()
-        self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
         self.mock_mr_sub = mock_mr_sub
         self.mock_mr_pub = mock_mr_pub
         self.app = create_app()
         self.app_context = self.app.app_context()
         self.app_context.push()
         db.create_all()
+        self.app_conf = AppConfig()
+        self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
+        self.sub_model = self.app_conf.subscription.get()
 
     def tearDown(self):
-        db.session.remove()
         db.drop_all()
+        db.session.remove()
         self.app_context.pop()
 
     def test_xnf_filter_true(self):
         self.assertTrue(self.app_conf.nf_filter.is_nf_in_filter('pnf1',
-                                                                OrchestrationStatus.ACTIVE.value))
+                                                                'Active'))
 
     def test_xnf_filter_false(self):
         self.assertFalse(self.app_conf.nf_filter.is_nf_in_filter('PNF-33',
-                                                                 OrchestrationStatus.ACTIVE.value))
+                                                                 'Active'))
 
     def test_sub_measurement_group(self):
         self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
@@ -82,18 +83,15 @@ class SubscriptionTest(TestCase):
     def test_get_subscription(self):
         sub_name = 'ExtraPM-All-gNB-R2B'
         self.app_conf.subscription.create()
-        new_sub = Subscription.get(sub_name)
+        new_sub = self.app_conf.subscription.get()
         self.assertEqual(sub_name, new_sub.subscription_name)
 
-    def test_get_subscription_no_match(self):
-        sub_name = 'sub2_does_not_exist'
-        sub = Subscription.get(sub_name)
-        self.assertEqual(sub, None)
-
     def test_get_nf_names_per_sub(self):
         self.app_conf.subscription.create()
-        self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
-        self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1])
+        self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+                                                                        self.sub_model)
+        self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1],
+                                                                        self.sub_model)
         nfs = Subscription.get_nf_names_per_sub(self.app_conf.subscription.subscriptionName)
         self.assertEqual(2, len(nfs))
 
@@ -105,37 +103,38 @@ class SubscriptionTest(TestCase):
 
     def test_add_network_functions_per_subscription(self):
         for nf in self.xnfs:
-            self.app_conf.subscription.add_network_function_to_subscription(nf)
+            self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
         nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
         self.assertEqual(3, len(nfs_for_sub_1))
         new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Active')
-        self.app_conf.subscription.add_network_function_to_subscription(new_nf)
+        self.app_conf.subscription.add_network_function_to_subscription(new_nf, self.sub_model)
         nf_subs = Subscription.get_all_nfs_subscription_relations()
         self.assertEqual(4, len(nf_subs))
 
     def test_add_duplicate_network_functions_per_subscription(self):
-        self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
+        self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+                                                                        self.sub_model)
         nf_subs = Subscription.get_all_nfs_subscription_relations()
         self.assertEqual(1, len(nf_subs))
-        self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
+        self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+                                                                        self.sub_model)
         nf_subs = Subscription.get_all_nfs_subscription_relations()
         self.assertEqual(1, len(nf_subs))
 
     def test_update_subscription_status(self):
-        sub_name = 'ExtraPM-All-gNB-R2B'
         self.app_conf.subscription.create()
         self.app_conf.subscription.administrativeState = 'new_status'
         self.app_conf.subscription.update_subscription_status()
-        sub = Subscription.get(sub_name)
+        sub = self.app_conf.subscription.get()
 
         self.assertEqual('new_status', sub.status)
 
     def test_delete_subscription(self):
         for nf in self.xnfs:
-            self.app_conf.subscription.add_network_function_to_subscription(nf)
+            self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
         self.app_conf.subscription.delete_subscription()
         self.assertEqual(0, len(Subscription.get_all()))
-        self.assertEqual(None, Subscription.get(self.app_conf.subscription.subscriptionName))
+        self.assertEqual(None, self.app_conf.subscription.get())
         self.assertEqual(0, len(Subscription.get_all_nfs_subscription_relations()))
         self.assertEqual(0, len(NetworkFunction.get_all()))
         self.assertEqual(None, NetworkFunction.get(nf_name=list(self.xnfs)[0].nf_name))
@@ -143,7 +142,7 @@ class SubscriptionTest(TestCase):
     def test_update_sub_nf_status(self):
         sub_name = 'ExtraPM-All-gNB-R2B'
         for nf in self.xnfs:
-            self.app_conf.subscription.add_network_function_to_subscription(nf)
+            self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
         sub_nfs = Subscription.get_all_nfs_subscription_relations()
         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
 
@@ -154,33 +153,27 @@ class SubscriptionTest(TestCase):
 
     @patch('mod.subscription.Subscription.add_network_function_to_subscription')
     @patch('mod.subscription.Subscription.update_sub_nf_status')
-    @patch('mod.subscription.Subscription.update_subscription_status')
-    def test_process_activate_subscription(self, mock_update_sub_status,
-                                           mock_update_sub_nf, mock_add_nfs):
-        self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
-                                                        self.app_conf)
+    def test_process_activate_subscription(self, mock_update_sub_nf, mock_add_nfs):
+        self.app_conf.subscription.activate_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
+                                                         self.app_conf)
 
-        mock_update_sub_status.assert_called()
         mock_add_nfs.assert_called()
         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
         mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
                                               'PENDING_CREATE', list(self.xnfs)[0].nf_name)
 
+    @patch('mod.subscription.Subscription.get_network_functions')
     @patch('mod.subscription.Subscription.update_sub_nf_status')
-    @patch('mod.subscription.Subscription.update_subscription_status')
-    def test_process_deactivate_subscription(self, mock_update_sub_status,
-                                             mock_update_sub_nf):
+    def test_process_deactivate_subscription(self, mock_update_sub_nf, mock_get_nfs):
         self.app_conf.subscription.administrativeState = 'LOCKED'
-        self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
-                                                        self.app_conf)
-
+        mock_get_nfs.return_value = [list(self.xnfs)[0]]
+        self.app_conf.subscription.deactivate_subscription(self.mock_mr_pub, self.app_conf)
         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
         mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
                                               'PENDING_DELETE', list(self.xnfs)[0].nf_name)
-        mock_update_sub_status.assert_called()
 
-    def test_process_subscription_exception(self):
-        self.assertRaises(Exception, self.app_conf.subscription.process_subscription,
+    def test_activate_subscription_exception(self):
+        self.assertRaises(Exception, self.app_conf.subscription.activate_subscription,
                           [list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
 
     def test_prepare_subscription_event(self):
@@ -193,7 +186,7 @@ class SubscriptionTest(TestCase):
 
     def test_get_nf_models(self):
         for nf in self.xnfs:
-            self.app_conf.subscription.add_network_function_to_subscription(nf)
+            self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
         nf_models = self.app_conf.subscription._get_nf_models()
 
         self.assertEqual(3, len(nf_models))
@@ -201,7 +194,7 @@ class SubscriptionTest(TestCase):
 
     def test_get_network_functions(self):
         for nf in self.xnfs:
-            self.app_conf.subscription.add_network_function_to_subscription(nf)
+            self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
         nfs = self.app_conf.subscription.get_network_functions()
 
         self.assertEqual(3, len(nfs))
index 65a7c2c..a661166 100644 (file)
 # ============LICENSE_END=====================================================
 import json
 import os
+from test.support import EnvironmentVarGuard
 from unittest import TestCase
 from unittest.mock import patch
 
+from mod import create_app, db
 from mod.network_function import NetworkFunction
 from mod.pmsh_utils import AppConfig
 from mod.subscription import AdministrativeState
@@ -28,72 +30,95 @@ from mod.subscription_handler import SubscriptionHandler
 
 class SubscriptionHandlerTest(TestCase):
 
+    @classmethod
+    def setUpClass(cls):
+        cls.env = EnvironmentVarGuard()
+        cls.env.set('AAI_SERVICE_PORT', '8443')
+        cls.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
+        cls.nfs = [NetworkFunction(nf_name='pnf_1'), NetworkFunction(nf_name='pnf_2')]
+
+    @patch('mod.get_db_connection_url')
+    @patch('mod.update_logging_config')
     @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
-    @patch('mod.create_app')
     @patch('mod.pmsh_utils._MrPub')
     @patch('mod.pmsh_utils.PeriodicTask')
-    def setUp(self, mock_aai_event_thread, mock_mr_pub, mock_app, mock_get_pmsh_config):
+    def setUp(self, mock_periodic_task, mock_mr_pub, mock_get_pmsh_config, mock_update_config,
+              mock_get_db_url):
+        mock_get_db_url.return_value = 'sqlite://'
         with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
             self.cbs_data = json.load(data)
         mock_get_pmsh_config.return_value = self.cbs_data
-        self.app_conf = AppConfig()
-        self.mock_app = mock_app
         self.mock_mr_pub = mock_mr_pub
-        self.mock_aai_event_thread = mock_aai_event_thread
-        self.nf_1 = NetworkFunction(nf_name='pnf_1')
-        self.nf_2 = NetworkFunction(nf_name='pnf_2')
-        self.nfs = [self.nf_1, self.nf_2]
+        self.mock_aai_event_thread = mock_periodic_task
+        self.mock_policy_event_thread = mock_periodic_task
+        self.app = create_app()
+        self.app.app_context().push()
+        db.create_all()
+        self.app_conf = AppConfig()
 
     def tearDown(self):
-        pass
+        db.drop_all()
+        db.session.remove()
 
+    @patch('mod.subscription.Subscription.get_local_sub_admin_state')
     @patch('mod.logger.info')
     @patch('mod.aai_client.get_pmsh_nfs_from_aai')
-    def test_execute_no_change_of_state(self, mock_get_aai, mock_logger):
+    def test_execute_no_change_of_state(self, mock_get_aai, mock_logger, mock_get_sub_status):
+        mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
         mock_get_aai.return_value = self.nfs
-        sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
-                                          self.mock_app, self.app_conf,
-                                          self.mock_aai_event_thread)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub,
+                                          self.app, self.app_conf,
+                                          self.mock_aai_event_thread,
+                                          self.mock_policy_event_thread)
         sub_handler.execute()
         mock_logger.assert_called_with('Administrative State did not change in the Config')
 
-    @patch('mod.subscription.Subscription.process_subscription')
+    @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+    @patch('mod.subscription.Subscription.activate_subscription')
     @patch('mod.aai_client.get_pmsh_nfs_from_aai')
-    def test_execute_change_of_state_unlocked(self, mock_get_aai, mock_process_sub):
+    def test_execute_change_of_state_to_unlocked(self, mock_get_aai, mock_activate_sub,
+                                                 mock_get_sub_status):
         mock_get_aai.return_value = self.nfs
+        mock_get_sub_status.return_value = AdministrativeState.LOCKED.value
         self.mock_aai_event_thread.return_value.start.return_value = 'start_method'
-        sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
-                                          self.mock_app, self.app_conf,
-                                          self.mock_aai_event_thread.return_value)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub,
+                                          self.app, self.app_conf,
+                                          self.mock_aai_event_thread.return_value,
+                                          self.mock_policy_event_thread)
         sub_handler.execute()
-        self.assertEqual(AdministrativeState.UNLOCKED.value, sub_handler.administrative_state)
-        mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
+        self.assertEqual(AdministrativeState.UNLOCKED.value,
+                         self.app_conf.subscription.administrativeState)
+        mock_activate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
         self.mock_aai_event_thread.return_value.start.assert_called()
 
-    @patch('mod.subscription.Subscription.process_subscription')
+    @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+    @patch('mod.subscription.Subscription.deactivate_subscription')
     @patch('mod.aai_client.get_pmsh_nfs_from_aai')
-    def test_execute_change_of_state_locked(self, mock_get_aai, mock_process_sub):
+    def test_execute_change_of_state_to_locked(self, mock_get_aai, mock_deactivate_sub,
+                                               mock_get_sub_status):
+        mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
+        self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
+        self.app_conf.subscription.update_subscription_status()
         mock_get_aai.return_value = self.nfs
         self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method'
-        self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
-        sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
-                                          self.mock_app, self.app_conf,
-                                          self.mock_aai_event_thread.return_value)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub,
+                                          self.app, self.app_conf,
+                                          self.mock_aai_event_thread.return_value,
+                                          self.mock_policy_event_thread)
         sub_handler.execute()
-        self.assertEqual(AdministrativeState.LOCKED.value, sub_handler.administrative_state)
-        mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
+        mock_deactivate_sub.assert_called_with(self.mock_mr_pub, self.app_conf)
         self.mock_aai_event_thread.return_value.cancel.assert_called()
-        self.app_conf.subscription.administrativeState = AdministrativeState.UNLOCKED.value
 
-    @patch('mod.subscription.Subscription.process_subscription')
+    @patch('mod.subscription.Subscription.activate_subscription')
     @patch('mod.logger.error')
     @patch('mod.aai_client.get_pmsh_nfs_from_aai')
-    def test_execute_exception(self, mock_get_aai, mock_logger, mock_process_sub):
+    def test_execute_exception(self, mock_get_aai, mock_logger, mock_activate_sub):
         mock_get_aai.return_value = self.nfs
-        mock_process_sub.side_effect = Exception
-        sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
-                                          self.mock_app, self.app_conf,
-                                          self.mock_aai_event_thread)
+        mock_activate_sub.side_effect = Exception
+        sub_handler = SubscriptionHandler(self.mock_mr_pub,
+                                          self.app, self.app_conf,
+                                          self.mock_aai_event_thread,
+                                          self.mock_policy_event_thread)
         sub_handler.execute()
         mock_logger.assert_called_with('Error occurred during the activation/deactivation process ',
                                        exc_info=True)