INVENTORIED = 'Inventoried'
-def process_aai_events(mr_sub, subscription, mr_pub, app):
+def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf):
"""
Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active.
subscription (Subscription): The current subscription object
mr_pub (_MrPub): MR publisher
app (db): DB application
+ app_conf (AppConfig): the application configuration.
"""
app.app_context().push()
aai_events = mr_sub.get_from_topic('AAI-EVENT')
new_status = aai_xnf['orchestration-status']
if NetworkFunctionFilter(**subscription.nfFilter).is_nf_in_filter(xnf_name):
- _process_event(action, new_status, xnf_name, subscription, mr_pub)
+ _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf)
-def _process_event(action, new_status, xnf_name, subscription, mr_pub):
+def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf):
if action == AAIEvent.UPDATE.value:
logger.debug(f'Update event found for network function {xnf_name}')
local_xnf = NetworkFunction.get(xnf_name)
if local_xnf is None:
logger.debug(f'Activating subscription for network function {xnf_name}')
subscription.process_subscription([NetworkFunction(
- nf_name=xnf_name, orchestration_status=new_status)], mr_pub)
+ nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf)
else:
logger.debug(f"Update Event for network function {xnf_name} will not be processed "
f" as it's state is set to {local_xnf.orchestration_status}.")
self.key_path = kwargs.get('key_path')
self.streams_subscribes = kwargs.get('streams_subscribes')
self.streams_publishes = kwargs.get('streams_publishes')
+ self.operational_policy_name = kwargs.get('operational_policy_name')
+ self.control_loop_name = kwargs.get('control_loop_name')
def get_mr_sub(self, sub_name):
"""
logger.debug(e)
raise
- def publish_subscription_event_data(self, subscription, xnf_name):
+ def publish_subscription_event_data(self, subscription, xnf_name, app_conf):
"""
Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
Args:
subscription: the `Subscription` <Subscription> object.
xnf_name: the xnf to include in the event.
+ app_conf (AppConfig): the application configuration.
"""
try:
- subscription_event = subscription.prepare_subscription_event(xnf_name)
+ subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf)
self.publish_to_topic(subscription_event)
except Exception as e:
logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
"""
See :class:`Timer`.
"""
+
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
self.administrativeState = kwargs.get('administrativeState')
self.fileBasedGP = kwargs.get('fileBasedGP')
self.fileLocation = kwargs.get('fileLocation')
- self.nfTypeModelInvariantId = kwargs.get('nfTypeModelInvariantId')
self.nfFilter = kwargs.get('nfFilter')
self.measurementGroups = kwargs.get('measurementGroups')
- def prepare_subscription_event(self, xnf_name):
+ def prepare_subscription_event(self, xnf_name, app_conf):
"""Prepare the sub event for publishing
Args:
xnf_name: the AAI xnf name.
+ app_conf (AppConfig): the application configuration.
Returns:
dict: the Subscription event to be published.
"""
clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
- clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}',
- 'changeType': 'DELETE'
- if self.administrativeState == AdministrativeState.LOCKED.value
- else 'CREATE'})
- return clean_sub
+ sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
+ 'changeType': 'DELETE'
+ if self.administrativeState == AdministrativeState.LOCKED.value
+ else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
+ 'subscription': clean_sub}
+ return sub_event
def create(self):
""" Creates a subscription database entry
@retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3),
retry=retry_if_exception_type(Exception))
- def process_subscription(self, nfs, mr_pub):
+ def process_subscription(self, nfs, mr_pub, app_conf):
action = 'Deactivate'
sub_nf_state = SubNfState.PENDING_DELETE.value
self.update_subscription_status()
try:
for nf in nfs:
- mr_pub.publish_subscription_event_data(self, nf.nf_name)
+ mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
logger.debug(f'Publishing Event to {action} '
f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
self.add_network_functions_to_subscription(nfs)
"""
app.app_context().push()
config = config_handler.get_config()
+ app_conf = AppConfig(**config['config'])
new_administrative_state = config['policy']['subscription']['administrativeState']
polling_period = 30.0
logger.debug(f'Administrative State changed from "{administrative_state}" "to '
f'"{new_administrative_state}".')
sub, nfs = aai.get_pmsh_subscription_data(config)
- sub.process_subscription(nfs, mr_pub)
- aai_event_thread = PeriodicTask(10, process_aai_events, args=(mr_aai_event_subscriber,
- sub, mr_pub, app))
+ sub.process_subscription(nfs, mr_pub, app_conf)
+ aai_event_thread = PeriodicTask(10, process_aai_events, args=(
+ mr_aai_event_subscriber, sub, mr_pub, app, app_conf))
if new_administrative_state == AdministrativeState.UNLOCKED.value:
logger.debug('Listening to AAI-EVENT topic in MR.')
"administrativeState":"UNLOCKED",
"fileBasedGP":15,
"fileLocation":"\/pm\/pm.xml",
- "nfTypeModelInvariantId":"2829292",
"nfFilter":{
"swVersions":[
"1.0.0",
}
},
"config":{
+ "control_loop_name": "pmsh-control-loop",
+ "operational_policy_name": "pmsh-operational-policy",
"aaf_password":"demo123456!",
"aaf_identity":"dcae@dcae.onap.org",
"cert_path":"/opt/app/pm-mapper/etc/certs/cert.pem",
"administrativeState": "UNLOCKED",
"fileBasedGP": 15,
"fileLocation": "c:\/\/PM",
- "nfTypeModelInvariantId": "2829292",
"nfFilter": {
"swVersions": [
"A21",
--- /dev/null
+{\r
+ "nfName":"pnf_1",\r
+ "policyName":"pmsh-operational-policy",\r
+ "changeType":"CREATE",\r
+ "closedLoopControlName":"pmsh-control-loop",\r
+ "subscription":{\r
+ "subscriptionName":"ExtraPM-All-gNB-R2B",\r
+ "administrativeState":"UNLOCKED",\r
+ "fileBasedGP":15,\r
+ "fileLocation":"/pm/pm.xml",\r
+ "measurementGroups":[\r
+ {\r
+ "measurementGroup":{\r
+ "measurementTypes":[\r
+ {\r
+ "measurementType":"countera"\r
+ },\r
+ {\r
+ "measurementType":"counterb"\r
+ }\r
+ ],\r
+ "managedObjectDNsBasic":[\r
+ {\r
+ "DN":"dna"\r
+ },\r
+ {\r
+ "DN":"dnb"\r
+ }\r
+ ]\r
+ }\r
+ },\r
+ {\r
+ "measurementGroup":{\r
+ "measurementTypes":[\r
+ {\r
+ "measurementType":"counterc"\r
+ },\r
+ {\r
+ "measurementType":"counterd"\r
+ }\r
+ ],\r
+ "managedObjectDNsBasic":[\r
+ {\r
+ "DN":"dnc"\r
+ },\r
+ {\r
+ "DN":"dnd"\r
+ }\r
+ ]\r
+ }\r
+ }\r
+ ]\r
+ }\r
+}
\ No newline at end of file
@patch('mod.aai_event_handler.NetworkFunction.delete')
@patch('mod.aai_event_handler.NetworkFunction.get')
- def test_process_aai_update_and_delete_events(self, mock_nf_get, mock_nf_delete):
+ @patch('pmsh_service_main.AppConfig')
+ def test_process_aai_update_and_delete_events(self, mock_app_conf, mock_nf_get, mock_nf_delete):
pnf_already_active = NetworkFunction(nf_name='pnf_already_active',
orchestration_status=OrchestrationStatus.ACTIVE.value)
mock_nf_get.side_effect = [None, pnf_already_active]
expected_nf_for_processing = NetworkFunction(
nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value)
- process_aai_events(self.mock_mr_sub, self.mock_sub, self.mock_mr_pub, self.mock_app)
+ process_aai_events(self.mock_mr_sub, self.mock_sub,
+ self.mock_mr_pub, self.mock_app, mock_app_conf)
self.mock_sub.process_subscription.assert_called_once_with([expected_nf_for_processing],
- self.mock_mr_pub)
+ self.mock_mr_pub, mock_app_conf)
mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted')
@patch('threading.Timer')
@patch('mod.aai_client.get_pmsh_subscription_data')
@patch('pmsh_service_main.PeriodicTask')
- def test_subscription_processor_changed_state(self, periodic_task, mock_get_aai, mock_thread):
+ @patch('pmsh_service_main.AppConfig')
+ def test_subscription_processor_changed_state(self, mock_app_conf, periodic_task, mock_get_aai,
+ mock_thread):
self.mock_config_handler.get_config.return_value = self.cbs_data_1
mock_get_aai.return_value = self.mock_sub, self.nfs
mock_thread.start.return_value = 1
pmsh_service.subscription_processor(self.mock_config_handler, 'LOCKED',
self.mock_mr_pub, self.mock_app, self.mock_aai_sub)
- self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub)
+ self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub,
+ mock_app_conf.return_value)
@patch('threading.Timer')
@patch('mod.pmsh_logging.debug')
def test_mr_pub_publish_sub_event_data_success(self):
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call:
- mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201')
+ mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201', self.app_conf)
pub_to_topic_call.assert_called_once()
@responses.activate
import mod.aai_client as aai_client
from mod import db, create_app
from mod.network_function import NetworkFunction
+from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription, NetworkFunctionFilter
@patch('mod.pmsh_utils._MrSub')
@patch('mod.get_db_connection_url')
@patch.object(Session, 'put')
- def setUp(self, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub):
+ @patch('pmsh_service_main.AppConfig')
+ def setUp(self, mock_app_config, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub):
mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
self.aai_response_data = data.read()
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
+ self.mock_app_config = mock_app_config
db.create_all()
def tearDown(self):
def test_process_activate_subscription(self, mock_update_sub_status,
mock_update_sub_nf, mock_add_nfs):
self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
- self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
+ self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
mock_update_sub_status.assert_called()
mock_add_nfs.assert_called()
mock_update_sub_nf):
self.sub_1.administrativeState = 'LOCKED'
self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
- self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
+ self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
def test_process_subscription_exception(self):
self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
self.assertRaises(Exception, self.sub_1.process_subscription,
- [self.nf_1], 'not_mr_pub')
+ [self.nf_1], 'not_mr_pub', 'app_config')
+
+ def test_prepare_subscription_event(self):
+ with open(os.path.join(os.path.dirname(__file__),
+ 'data/pm_subscription_event.json'), 'r') as data:
+ expected_sub_event = json.load(data)
+ app_conf = AppConfig(**self.cbs_data_1['config'])
+ actual_sub_event = self.sub_1.prepare_subscription_event(self.nf_1.nf_name, app_conf)
+ print(actual_sub_event)
+ self.assertEqual(expected_sub_event, actual_sub_event)