+
+ @patch.object(AppConfig, 'publish_to_topic')
+ def test_update_admin_status_to_locking(self, mock_mr):
+ super().setUpAppConf()
+ sub = create_subscription_data('sub')
+ nf_list = create_multiple_network_function_data(['pnf_101', 'pnf_102'])
+ db.session.add(sub)
+ for nf in nf_list:
+ nf_service.save_nf(nf)
+ measurement_group_service. \
+ apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[0].
+ measurement_group_name,
+ SubNfState.CREATED.value)
+ db.session.commit()
+ measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED')
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG1')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG1')
+ self.assertEqual(meas_grp.administrative_state, 'LOCKING')
+ meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\
+ .all()
+ for nf in meas_group_nfs:
+ self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_DELETE.value)
+
+ @patch.object(AppConfig, 'publish_to_topic')
+ def test_update_admin_status_to_locked(self, mock_mr):
+ super().setUpAppConf()
+ sub = create_subscription_data('sub')
+ db.session.add(sub)
+ measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED')
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG1')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG1')
+ self.assertEqual(meas_grp.administrative_state, 'LOCKED')
+
+ @patch.object(AppConfig, 'publish_to_topic')
+ @patch.object(aai_client, '_get_all_aai_nf_data')
+ @patch.object(aai_client, 'get_aai_model_data')
+ @patch.object(NetworkFunctionFilter, 'get_network_function_filter')
+ def test_update_admin_status_to_unlocked_with_no_nfs(self, mock_filter_call,
+ mock_model_aai, mock_aai, mock_mr):
+ mock_aai.return_value = json.loads(self.aai_response_data)
+ mock_model_aai.return_value = json.loads(self.good_model_info)
+ super().setUpAppConf()
+ sub = create_subscription_data('sub')
+ sub.nfs = []
+ db.session.add(sub)
+ db.session.commit()
+ mock_filter_call.return_value = NetworkFunctionFilter.get_network_function_filter('sub')
+ measurement_group_service.update_admin_status(sub.measurement_groups[1], 'UNLOCKED')
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG2')
+ self.assertEqual(meas_grp.administrative_state, 'UNLOCKED')
+ meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\
+ .all()
+ for nf in meas_group_nfs:
+ self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_CREATE.value)
+
+ @patch.object(AppConfig, 'publish_to_topic')
+ @patch.object(aai_client, '_get_all_aai_nf_data')
+ @patch.object(aai_client, 'get_aai_model_data')
+ @patch.object(NetworkFunctionFilter, 'get_network_function_filter')
+ def test_update_admin_status_to_unlocking(self, mock_filter_call,
+ mock_model_aai, mock_aai, mock_mr):
+ mock_aai.return_value = json.loads(self.aai_response_data)
+ mock_model_aai.return_value = json.loads(self.good_model_info)
+ super().setUpAppConf()
+ sub = create_subscription_data('sub')
+ db.session.add(sub)
+ db.session.commit()
+ mock_filter_call.return_value = NetworkFunctionFilter.get_network_function_filter('sub')
+ measurement_group_service.update_admin_status(sub.measurement_groups[1], 'UNLOCKED')
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG2')
+ self.assertEqual(meas_grp.administrative_state, 'UNLOCKED')
+ meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\
+ .all()
+ for nf in meas_group_nfs:
+ self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_CREATE.value)
+
+ def test_update_admin_status_for_missing_measurement_group(self):
+ try:
+ measurement_group_service.update_admin_status(None, 'UNLOCKED')
+ except InvalidDataException as e:
+ self.assertEqual(e.args[0], 'Requested measurement group not available '
+ 'for admin status update')
+
+ def test_update_admin_status_for_data_conflict(self):
+ super().setUpAppConf()
+ sub = create_subscription_data('sub1')
+ sub.measurement_groups[0].administrative_state = 'LOCKING'
+ try:
+ measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED')
+ except DataConflictException as e:
+ self.assertEqual(e.args[0], 'Cannot update admin status as Locked request'
+ ' is in progress for sub name: sub1 and '
+ 'meas group name: MG1')
+
+ def test_update_admin_status_for_same_state(self):
+ super().setUpAppConf()
+ sub = create_subscription_data('sub1')
+ try:
+ measurement_group_service.update_admin_status(sub.measurement_groups[0], 'UNLOCKED')
+ except InvalidDataException as e:
+ self.assertEqual(e.args[0], 'Measurement group is already in UNLOCKED '
+ 'state for sub name: sub1 and meas group '
+ 'name: MG1')
+
+ def test_lock_nf_to_meas_grp_for_locking_with_LOCKED_update(self):
+ sub = create_subscription_data('sub')
+ sub.measurement_groups[1].administrative_state = 'LOCKING'
+ nf_list = create_multiple_network_function_data(['pnf_101'])
+ db.session.add(sub)
+ for nf in nf_list:
+ nf_service.save_nf(nf)
+ measurement_group_service. \
+ apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[1].
+ measurement_group_name,
+ SubNfState.PENDING_DELETE.value)
+ db.session.commit()
+ measurement_group_service.lock_nf_to_meas_grp(
+ "pnf_101", "MG2", SubNfState.DELETED.value)
+ measurement_grp_rel = (NfMeasureGroupRelationalModel.query.filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == 'MG2',
+ NfMeasureGroupRelationalModel.nf_name == 'pnf_101').one_or_none())
+ self.assertIsNone(measurement_grp_rel)
+ network_function = (NetworkFunctionModel.query.filter(
+ NetworkFunctionModel.nf_name == 'pnf_101').one_or_none())
+ self.assertIsNone(network_function)
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG2')
+ self.assertEqual(meas_grp.administrative_state, 'LOCKED')
+
+ def test_lock_nf_to_meas_grp_with_no_LOCKED_update(self):
+ sub = create_subscription_data('sub')
+ sub.measurement_groups[1].administrative_state = 'LOCKING'
+ nf_list = create_multiple_network_function_data(['pnf_101', 'pnf_102'])
+ db.session.add(sub)
+ for nf in nf_list:
+ nf_service.save_nf(nf)
+ measurement_group_service. \
+ apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[1].
+ measurement_group_name,
+ SubNfState.PENDING_DELETE.value)
+ db.session.commit()
+ measurement_group_service.lock_nf_to_meas_grp(
+ "pnf_101", "MG2", SubNfState.DELETED.value)
+ measurement_grp_rel = (NfMeasureGroupRelationalModel.query.filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == 'MG2',
+ NfMeasureGroupRelationalModel.nf_name == 'pnf_101').one_or_none())
+ self.assertIsNone(measurement_grp_rel)
+ network_function = (NetworkFunctionModel.query.filter(
+ NetworkFunctionModel.nf_name == 'pnf_101').one_or_none())
+ self.assertIsNone(network_function)
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG2')
+ self.assertEqual(meas_grp.administrative_state, 'LOCKING')