1 # ============LICENSE_START===================================================
2 # Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
21 from threading import Timer
24 from requests.auth import HTTPBasicAuth
25 from tenacity import retry, wait_fixed, retry_if_exception_type
27 import mod.pmsh_logging as logger
28 from mod.network_function import NetworkFunction
29 from mod.subscription import Subscription, SubNfState, AdministrativeState
33 def __init__(self, **kwargs):
34 self.aaf_creds = {'aaf_id': kwargs.get('aaf_identity'),
35 'aaf_pass': kwargs.get('aaf_password')}
36 self.cert_path = kwargs.get('cert_path')
37 self.key_path = kwargs.get('key_path')
38 self.streams_subscribes = kwargs.get('streams_subscribes')
39 self.streams_publishes = kwargs.get('streams_publishes')
41 def get_mr_sub(self, sub_name):
43 Returns the MrSub object requested.
46 sub_name: the key of the subscriber object.
49 MrSub: an Instance of an `MrSub` <MrSub> Object.
52 KeyError: if the sub_name is not found.
55 return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name])
60 def get_mr_pub(self, pub_name):
62 Returns the MrPub object requested.
65 pub_name: the key of the publisher object.
68 MrPub: an Instance of an `MrPub` <MrPub> Object.
71 KeyError: if the sub_name is not found.
74 return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name])
80 def cert_params(self):
82 Returns the tls artifact paths.
85 cert_path, key_path: the path to tls cert and key.
87 return self.cert_path, self.key_path
91 def __init__(self, aaf_creds, **kwargs):
93 A DMaaP Message Router utility class.
94 Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
96 aaf_creds: a dict of aaf secure credentials.
97 **kwargs: a dict of streams_{subscribes|publishes} data.
99 self.topic_url = kwargs.get('dmaap_info').get('topic_url')
100 self.aaf_id = aaf_creds.get('aaf_id')
101 self.aaf_pass = aaf_creds.get('aaf_pass')
104 class _MrPub(_DmaapMrClient):
105 def __init__(self, pub_name, aaf_creds, **kwargs):
106 self.pub_name = pub_name
107 super().__init__(aaf_creds, **kwargs)
109 def publish_to_topic(self, event_json):
111 Publish the event to the DMaaP Message Router topic.
114 event_json: the json data to be published.
117 Exception: if post request fails.
120 session = requests.Session()
121 headers = {'content-type': 'application/json', 'x-transactionId': str(uuid.uuid1())}
122 response = session.post(self.topic_url, headers=headers,
123 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
125 response.raise_for_status()
126 except Exception as e:
130 def publish_subscription_event_data(self, subscription, xnf_name):
132 Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
135 subscription: the `Subscription` <Subscription> object.
136 xnf_name: the xnf to include in the event.
139 subscription_event = subscription.prepare_subscription_event(xnf_name)
140 self.publish_to_topic(subscription_event)
141 except Exception as e:
142 logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
145 class _MrSub(_DmaapMrClient):
146 def __init__(self, sub_name, aaf_creds, **kwargs):
147 self.sub_name = sub_name
148 super().__init__(aaf_creds, **kwargs)
150 def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000):
152 Returns the json data from the MrTopic.
155 consumer_id: Within your subscribers group, a name that uniquely
156 identifies your subscribers process.
157 consumer_group: A name that uniquely identifies your subscribers.
158 timeout: The request timeout value in mSec.
161 list[str]: the json response from DMaaP Message Router topic, else None.
165 session = requests.Session()
166 headers = {'accept': 'application/json', 'content-type': 'application/json'}
167 logger.debug(f'Request sent to MR topic: {self.topic_url}')
168 response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
169 f'?timeout={timeout}',
170 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
172 response.raise_for_status()
174 topic_data = response.json()
175 except Exception as e:
180 def _handle_response(subscription_name, administrative_state, nf_name, response_message):
182 Handles the response from Policy, updating the DB
185 subscription_name (str): The subscription name
186 administrative_state (str): The administrative state of the subscription
187 nf_name (str): The network function name
188 response_message (str): The message in the response regarding the state (success|failed)
190 logger.debug(f'Response from MR: Sub: {subscription_name} for '
191 f'NF: {nf_name} received, updating the DB')
193 sub_nf_status = subscription_nf_states[administrative_state][response_message].value
194 policy_response_handle_functions[administrative_state][response_message](
195 subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name)
196 except Exception as err:
197 raise Exception(f'Error changing nf_sub status in the DB: {err}')
199 @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception))
200 def poll_policy_topic(self, subscription_name, app):
202 This method polls MR for response from policy. It checks whether the message is for the
203 relevant subscription and then handles the response
206 subscription_name (str): The subscription name
207 app (app): Needed to push context for the db
209 app.app_context().push()
210 administrative_state = Subscription.get(subscription_name).status
212 response_data = self.get_from_topic('policy_response_consumer')
213 for data in response_data:
214 data = json.loads(data)
215 if data['status']['subscriptionName'] == subscription_name:
216 nf_name = data['status']['nfName']
217 response_message = data['status']['message']
218 self._handle_response(subscription_name, administrative_state,
219 nf_name, response_message)
220 threading.Timer(5, self.poll_policy_topic, [subscription_name, app]).start()
221 except Exception as err:
222 raise Exception(f'Error trying to poll MR: {err}')
225 subscription_nf_states = {
226 AdministrativeState.LOCKED.value: {
227 'success': SubNfState.CREATED,
228 'failed': SubNfState.DELETE_FAILED
230 AdministrativeState.UNLOCKED.value: {
231 'success': SubNfState.CREATED,
232 'failed': SubNfState.CREATE_FAILED
236 policy_response_handle_functions = {
237 AdministrativeState.LOCKED.value: {
238 'success': NetworkFunction.delete,
239 'failed': Subscription.update_sub_nf_status
241 AdministrativeState.UNLOCKED.value: {
242 'success': Subscription.update_sub_nf_status,
243 'failed': Subscription.update_sub_nf_status
248 class PeriodicTask(Timer):
253 while not self.finished.wait(self.interval):
254 self.function(*self.args, **self.kwargs)