Merge "Handle AAI Update and Delete events for PMSH"
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / mod / pmsh_utils.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 import json
19 import threading
20 import uuid
21 from threading import Timer
22
23 import requests
24 from requests.auth import HTTPBasicAuth
25 from tenacity import retry, wait_fixed, retry_if_exception_type
26
27 import mod.pmsh_logging as logger
28 from mod.network_function import NetworkFunction
29 from mod.subscription import Subscription, SubNfState, AdministrativeState
30
31
32 class AppConfig:
33     def __init__(self, **kwargs):
34         self.aaf_creds = {'aaf_id': kwargs.get('aaf_identity'),
35                           'aaf_pass': kwargs.get('aaf_password')}
36         self.cert_path = kwargs.get('cert_path')
37         self.key_path = kwargs.get('key_path')
38         self.streams_subscribes = kwargs.get('streams_subscribes')
39         self.streams_publishes = kwargs.get('streams_publishes')
40
41     def get_mr_sub(self, sub_name):
42         """
43         Returns the MrSub object requested.
44
45         Args:
46             sub_name: the key of the subscriber object.
47
48         Returns:
49             MrSub: an Instance of an `MrSub` <MrSub> Object.
50
51         Raises:
52             KeyError: if the sub_name is not found.
53         """
54         try:
55             return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name])
56         except KeyError as e:
57             logger.debug(e)
58             raise
59
60     def get_mr_pub(self, pub_name):
61         """
62         Returns the MrPub object requested.
63
64         Args:
65             pub_name: the key of the publisher object.
66
67         Returns:
68             MrPub: an Instance of an `MrPub` <MrPub> Object.
69
70         Raises:
71             KeyError: if the sub_name is not found.
72         """
73         try:
74             return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name])
75         except KeyError as e:
76             logger.debug(e)
77             raise
78
79     @property
80     def cert_params(self):
81         """
82         Returns the tls artifact paths.
83
84         Returns:
85             cert_path, key_path: the path to tls cert and key.
86         """
87         return self.cert_path, self.key_path
88
89
90 class _DmaapMrClient:
91     def __init__(self, aaf_creds, **kwargs):
92         """
93         A DMaaP Message Router utility class.
94         Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
95         Args:
96             aaf_creds: a dict of aaf secure credentials.
97             **kwargs: a dict of streams_{subscribes|publishes} data.
98         """
99         self.topic_url = kwargs.get('dmaap_info').get('topic_url')
100         self.aaf_id = aaf_creds.get('aaf_id')
101         self.aaf_pass = aaf_creds.get('aaf_pass')
102
103
104 class _MrPub(_DmaapMrClient):
105     def __init__(self, pub_name, aaf_creds, **kwargs):
106         self.pub_name = pub_name
107         super().__init__(aaf_creds, **kwargs)
108
109     def publish_to_topic(self, event_json):
110         """
111         Publish the event to the DMaaP Message Router topic.
112
113         Args:
114             event_json: the json data to be published.
115
116         Raises:
117             Exception: if post request fails.
118         """
119         try:
120             session = requests.Session()
121             headers = {'content-type': 'application/json', 'x-transactionId': str(uuid.uuid1())}
122             response = session.post(self.topic_url, headers=headers,
123                                     auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
124                                     verify=False)
125             response.raise_for_status()
126         except Exception as e:
127             logger.debug(e)
128             raise
129
130     def publish_subscription_event_data(self, subscription, xnf_name):
131         """
132         Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
133
134         Args:
135             subscription: the `Subscription` <Subscription> object.
136             xnf_name: the xnf to include in the event.
137         """
138         try:
139             subscription_event = subscription.prepare_subscription_event(xnf_name)
140             self.publish_to_topic(subscription_event)
141         except Exception as e:
142             logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
143
144
145 class _MrSub(_DmaapMrClient):
146     def __init__(self, sub_name, aaf_creds, **kwargs):
147         self.sub_name = sub_name
148         super().__init__(aaf_creds, **kwargs)
149
150     def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000):
151         """
152         Returns the json data from the MrTopic.
153
154         Args:
155             consumer_id: Within your subscribers group, a name that uniquely
156             identifies your subscribers process.
157             consumer_group: A name that uniquely identifies your subscribers.
158             timeout: The request timeout value in mSec.
159
160         Returns:
161             list[str]: the json response from DMaaP Message Router topic, else None.
162         """
163         topic_data = None
164         try:
165             session = requests.Session()
166             headers = {'accept': 'application/json', 'content-type': 'application/json'}
167             logger.debug(f'Request sent to MR topic: {self.topic_url}')
168             response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
169                                    f'?timeout={timeout}',
170                                    auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
171                                    verify=False)
172             response.raise_for_status()
173             if response.ok:
174                 topic_data = response.json()
175         except Exception as e:
176             logger.debug(e)
177         return topic_data
178
179     @staticmethod
180     def _handle_response(subscription_name, administrative_state, nf_name, response_message):
181         """
182         Handles the response from Policy, updating the DB
183
184         Args:
185             subscription_name (str): The subscription name
186             administrative_state (str): The administrative state of the subscription
187             nf_name (str): The network function name
188             response_message (str): The message in the response regarding the state (success|failed)
189         """
190         logger.debug(f'Response from MR: Sub: {subscription_name} for '
191                      f'NF: {nf_name} received, updating the DB')
192         try:
193             sub_nf_status = subscription_nf_states[administrative_state][response_message].value
194             policy_response_handle_functions[administrative_state][response_message](
195                 subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name)
196         except Exception as err:
197             raise Exception(f'Error changing nf_sub status in the DB: {err}')
198
199     @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception))
200     def poll_policy_topic(self, subscription_name, app):
201         """
202         This method polls MR for response from policy. It checks whether the message is for the
203         relevant subscription and then handles the response
204
205         Args:
206             subscription_name (str): The subscription name
207             app (app): Needed to push context for the db
208         """
209         app.app_context().push()
210         administrative_state = Subscription.get(subscription_name).status
211         try:
212             response_data = self.get_from_topic('policy_response_consumer')
213             for data in response_data:
214                 data = json.loads(data)
215                 if data['status']['subscriptionName'] == subscription_name:
216                     nf_name = data['status']['nfName']
217                     response_message = data['status']['message']
218                     self._handle_response(subscription_name, administrative_state,
219                                           nf_name, response_message)
220             threading.Timer(5, self.poll_policy_topic, [subscription_name, app]).start()
221         except Exception as err:
222             raise Exception(f'Error trying to poll MR: {err}')
223
224
225 subscription_nf_states = {
226     AdministrativeState.LOCKED.value: {
227         'success': SubNfState.CREATED,
228         'failed': SubNfState.DELETE_FAILED
229     },
230     AdministrativeState.UNLOCKED.value: {
231         'success': SubNfState.CREATED,
232         'failed': SubNfState.CREATE_FAILED
233     }
234 }
235
236 policy_response_handle_functions = {
237     AdministrativeState.LOCKED.value: {
238         'success': NetworkFunction.delete,
239         'failed': Subscription.update_sub_nf_status
240     },
241     AdministrativeState.UNLOCKED.value: {
242         'success': Subscription.update_sub_nf_status,
243         'failed': Subscription.update_sub_nf_status
244     }
245 }
246
247
248 class PeriodicTask(Timer):
249     """
250     See :class:`Timer`.
251     """
252     def run(self):
253         while not self.finished.wait(self.interval):
254             self.function(*self.args, **self.kwargs)