* Added Resource Name (model-name) to filter (DCAEGEN2-2402)
* Added retry mechanism for DELETE_FAILED subscriptions on given NFs (DCAEGEN2-2152)
* Added func to update the subscription object on ACTIVATE/UNLOCK (DCAEGEN2-2152)
+* Added validation for schema of PMSH monitoring policy (DCAEGEN2-2152)
## [1.1.2]
### Changed
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+import json
+import os
import uuid
+from json import JSONDecodeError
from os import getenv
from threading import Timer
from onaplogging.mdcContext import MDC
from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
+from jsonschema import validate, ValidationError
-import mod.network_function
from mod import logger
from mod.subscription import Subscription
kwargs['request_id'] = request_id
kwargs['invocation_id'] = invocation_id
return function(*args, **kwargs)
+
return decorator
return type(clz.__name__, (MySingleton,), dict(clz.__dict__))
+def _load_sub_schema_from_file():
+ try:
+ with open(os.path.join(os.path.dirname(__file__), 'sub_schema.json')) as sub:
+ return json.load(sub)
+ except OSError as err:
+ logger.error(f'Failed to read sub schema file: {err}', exc_info=True)
+ except JSONDecodeError as json_err:
+ logger.error(f'sub schema file is not a valid JSON file: {json_err}', exc_info=True)
+
+
class AppConfig:
INSTANCE = None
self.streams_publishes = conf['config'].get('streams_publishes')
self.operational_policy_name = conf['config'].get('operational_policy_name')
self.control_loop_name = conf['config'].get('control_loop_name')
+ self.sub_schema = _load_sub_schema_from_file()
self.subscription = Subscription(**conf['policy']['subscription'])
- self.nf_filter = mod.network_function.NetworkFunctionFilter(**self.subscription.nfFilter)
+ self.nf_filter = None
def __new__(cls, *args, **kwargs):
if AppConfig.INSTANCE is None:
logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
raise ValueError(e)
+ def validate_sub_schema(self):
+ """
+ Validates schema of PMSH subscription
+
+ Raises:
+ ValidationError: If the PMSH subscription schema is invalid
+ """
+ sub_data = self.subscription.__dict__
+ validate(instance=sub_data, schema=self.sub_schema)
+ nf_filter = sub_data["nfFilter"]
+ for filter_name in nf_filter:
+ if len(nf_filter[filter_name]) > 0:
+ break
+ else:
+ raise ValidationError("At least one filter within nfFilter must not be empty")
+ logger.debug("Subscription schema is valid.")
+
def refresh_config(self):
"""
Update the relevant attributes of the AppConfig object.
--- /dev/null
+{
+ "type":"object",
+ "properties":{
+ "subscriptionName":{
+ "type":"string"
+ },
+ "administrativeState":{
+ "allOf":[
+ {
+ "type":"string"
+ },
+ {
+ "enum":[
+ "UNLOCKED",
+ "LOCKED"
+ ]
+ }
+ ]
+ },
+ "fileBasedGP":{
+ "type":"integer"
+ },
+ "fileLocation":{
+ "type":"string"
+ },
+ "nfFilter":{
+ "type":"object",
+ "properties":{
+ "nfNames":{
+ "type":"array",
+ "items":{
+ "type":"string"
+ }
+ },
+ "modelInvariantIDs":{
+ "type":"array",
+ "items":{
+ "type":"string"
+ }
+ },
+ "modelVersionIDs":{
+ "type":"array",
+ "items":{
+ "type":"string"
+ }
+ },
+ "modelNames":{
+ "type":"array",
+ "items":{
+ "type":"string"
+ }
+ }
+ },
+ "required":[
+ "nfNames",
+ "modelInvariantIDs",
+ "modelVersionIDs",
+ "modelNames"
+ ]
+ },
+ "measurementGroups":{
+ "type":"array",
+ "minItems": 1,
+ "items":{
+ "type":"object",
+ "properties":{
+ "measurementGroup":{
+ "type":"object",
+ "properties":{
+ "measurementTypes":{
+ "type":"array",
+ "minItems": 1,
+ "items":{
+ "type":"object",
+ "properties":{
+ "measurementType":{
+ "type":"string"
+ }
+ },
+ "required":[
+ "measurementType"
+ ]
+ }
+ },
+ "managedObjectDNsBasic":{
+ "type":"array",
+ "minItems": 1,
+ "items":{
+ "type":"object",
+ "properties":{
+ "DN":{
+ "type":"string"
+ }
+ },
+ "required":[
+ "DN"
+ ]
+ }
+ }
+ },
+ "required":[
+ "measurementTypes",
+ "managedObjectDNsBasic"
+ ]
+ }
+ },
+ "required":[
+ "measurementGroup"
+ ]
+ }
+ }
+ },
+ "required":[
+ "subscriptionName",
+ "administrativeState",
+ "fileBasedGP",
+ "fileLocation",
+ "nfFilter",
+ "measurementGroups"
+ ]
+
+}
\ No newline at end of file
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+from jsonschema import ValidationError
from mod import logger, aai_client
from mod.aai_event_handler import process_aai_events
+from mod.network_function import NetworkFunctionFilter
from mod.pmsh_utils import PeriodicTask
from mod.subscription import AdministrativeState
self._check_for_failed_nfs()
else:
self.app_conf.refresh_config()
+ self.app_conf.validate_sub_schema()
new_administrative_state = self.app_conf.subscription.administrativeState
if local_admin_state == new_administrative_state:
logger.info(f'Administrative State did not change in the app config: '
f'{new_administrative_state}')
else:
self._check_state_change(local_admin_state, new_administrative_state)
+ except (ValidationError, TypeError) as err:
+ logger.error(f'Error occurred during validation of subscription schema {err}',
+ exc_info=True)
except Exception as err:
logger.error(f'Error occurred during the activation/deactivation process {err}',
exc_info=True)
raise Exception(f'Invalid AdministrativeState: {new_administrative_state}')
def _activate(self, new_administrative_state):
+ if not self.app_conf.nf_filter:
+ self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter)
self._start_aai_event_thread()
self.app_conf.subscription.update_sub_params(new_administrative_state,
self.app_conf.subscription.fileBasedGP,
# ============LICENSE_START=======================================================
-# Copyright (C) 2019-2020 Nordix Foundation.
+# Copyright (C) 2019-2021 Nordix Foundation.
# Copyright 2020 Deutsche Telekom. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
"psycopg2-binary==2.8.4",
"onap_dcae_cbs_docker_client==2.1.1",
"onappylog==1.0.9",
- "ruamel.yaml==0.16.10"]
+ "ruamel.yaml==0.16.10",
+ "jsonschema==3.2.0"]
)
from unittest.mock import patch, MagicMock
from mod import create_app, db
+from mod.network_function import NetworkFunctionFilter
from mod.pmsh_utils import AppConfig
-def get_pmsh_config():
- with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+def get_pmsh_config(file_path='data/cbs_data_1.json'):
+ with open(os.path.join(os.path.dirname(__file__), file_path), 'r') as data:
return json.load(data)
os.environ['AAI_SERVICE_PORT'] = '8443'
db.create_all()
self.app_conf = AppConfig()
+ self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter)
def tearDown(self):
db.drop_all()
],
"modelVersionIDs": [
+ ],
+ "modelNames": [
+
]
},
"measurementGroups":[
--- /dev/null
+{
+ "policy":{
+ "subscription":{
+ "subscriptionName":"ExtraPM-All-gNB-R2B",
+ "administrativeState":"UNLOCKED",
+ "fileBasedGP":15,
+ "fileLocation":"\/pm\/pm.xml",
+ "nfFilter":{
+ "nfNames":[
+
+ ],
+ "modelInvariantIDs": [
+
+ ],
+ "modelVersionIDs": [
+
+ ],
+ "modelNames": [
+
+ ]
+ },
+ "measurementGroups":[
+ {
+ "measurementGroup":{
+ "measurementTypes":[
+ {
+ "measurementType":"countera"
+ },
+ {
+ "measurementType":"counterb"
+ }
+ ],
+ "managedObjectDNsBasic":[
+ {
+ "DN":"dna"
+ },
+ {
+ "DN":"dnb"
+ }
+ ]
+ }
+ },
+ {
+ "measurementGroup":{
+ "measurementTypes":[
+ {
+ "measurementType":"counterc"
+ },
+ {
+ "measurementType":"counterd"
+ }
+ ],
+ "managedObjectDNsBasic":[
+ {
+ "DN":"dnc"
+ },
+ {
+ "DN":"dnd"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ },
+ "config":{
+ "control_loop_name": "pmsh-control-loop",
+ "operational_policy_name": "pmsh-operational-policy",
+ "aaf_password":"demo123456!",
+ "aaf_identity":"dcae@dcae.onap.org",
+ "cert_path":"/opt/app/pmsh/etc/certs/cert.pem",
+ "key_path":"/opt/app/pmsh/etc/certs/key.pem",
+ "ca_cert_path":"/opt/app/pmsh/etc/certs/cacert.pem",
+ "enable_tls":"true",
+ "streams_subscribes":{
+ "aai_subscriber":{
+ "type":"message_router",
+ "dmaap_info":{
+ "topic_url":"https://message-router:3905/events/AAI_EVENT",
+ "client_role":"org.onap.dcae.aaiSub",
+ "location":"san-francisco",
+ "client_id":"1575976809466"
+ }
+ },
+ "policy_pm_subscriber":{
+ "type":"message_router",
+ "dmaap_info":{
+ "topic_url":"https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS",
+ "client_role":"org.onap.dcae.pmSubscriber",
+ "location":"san-francisco",
+ "client_id":"1575876809456"
+ }
+ }
+ },
+ "streams_publishes":{
+ "policy_pm_publisher":{
+ "type":"message_router",
+ "dmaap_info":{
+ "topic_url":"https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS",
+ "client_role":"org.onap.dcae.pmPublisher",
+ "location":"san-francisco",
+ "client_id":"1475976809466"
+ }
+ },
+ "other_publisher":{
+ "type":"message_router",
+ "dmaap_info":{
+ "topic_url":"https://message-router:3905/events/org.onap.dmaap.mr.SOME_OTHER_TOPIC",
+ "client_role":"org.onap.dcae.pmControlPub",
+ "location":"san-francisco",
+ "client_id":"1875976809466"
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
# ============LICENSE_START===================================================
-# Copyright (C) 2019-2020 Nordix Foundation.
+# Copyright (C) 2019-2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
from unittest.mock import patch, Mock
import responses
+from jsonschema import ValidationError
from requests import Session
from tenacity import RetryError
with self.assertRaises(RetryError):
self.app_conf.refresh_config()
mock_logger.assert_called_with('Failed to refresh PMSH AppConfig')
+
+ @patch('mod.logger.debug')
+ def test_utils_validate_config_subscription(self, mock_logger):
+ self.app_conf.validate_sub_schema()
+ mock_logger.assert_called_with("Subscription schema is valid.")
+
+ @patch('mod.logger.debug')
+ def test_utils_validate_config_subscription_administrativeState_locked(self, mock_logger):
+ self.app_conf.subscription.administrativeState = "LOCKED"
+ self.app_conf.validate_sub_schema()
+ mock_logger.assert_called_with("Subscription schema is valid.")
+
+ def test_utils_validate_config_subscription_administrativeState_invalid_value(self):
+ self.app_conf.subscription.administrativeState = "FAILED"
+ with self.assertRaises(ValidationError):
+ self.app_conf.validate_sub_schema()
+
+ def test_utils_validate_config_subscription_nfFilter_failed(self):
+ self.app_conf.subscription.nfFilter = {}
+ with self.assertRaises(ValidationError):
+ self.app_conf.validate_sub_schema()
+
+ def test_utils_validate_config_subscription_where_measurementTypes_is_empty(self):
+ self.app_conf.subscription.measurementGroups = [{
+ "measurementGroup": {
+ "measurementTypes": [
+ ],
+ "managedObjectDNsBasic": [
+ {
+ "DN": "dna"
+ },
+ {
+ "DN": "dnb"
+ }
+ ]
+ }
+ }]
+ with self.assertRaises(ValidationError):
+ self.app_conf.validate_sub_schema()
+
+ def test_utils_validate_config_subscription_where_managedObjectDNsBasic_is_empty(self):
+ self.app_conf.subscription.measurementGroups = [{
+ "measurementGroup": {
+ "measurementTypes": [
+ {
+ "measurementType": "countera"
+ },
+ {
+ "measurementType": "counterb"
+ }
+ ],
+ "managedObjectDNsBasic": [
+
+ ]
+ }
+ }]
+ with self.assertRaises(ValidationError):
+ self.app_conf.validate_sub_schema()
+
+ def test_utils_validate_config_subscription_where_measurementGroups_is_empty(self):
+ self.app_conf.subscription.measurementGroups = []
+ with self.assertRaises(ValidationError):
+ self.app_conf.validate_sub_schema()
self.app_conf)
sub_handler.execute()
mock_nf_del.assert_called_once()
+
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config',
+ MagicMock(return_value=get_pmsh_config('data/cbs_invalid_data.json')))
+ @patch('mod.subscription_handler.SubscriptionHandler._check_state_change')
+ def test_execute_invalid_schema(self, mock_change_state_check):
+ sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app,
+ self.app_conf)
+ sub_handler.execute()
+ mock_change_state_check.assert_not_called()
+
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config',
+ MagicMock(return_value=get_pmsh_config()))
+ @patch('mod.subscription_handler.SubscriptionHandler._check_state_change')
+ def test_execute_valid_schema(self, mock_change_state_check):
+ sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app,
+ self.app_conf)
+ sub_handler.execute()
+ mock_change_state_check.assert_called_once()