1 # Copyright 2019 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from rest_framework import status
20 from requests.auth import HTTPBasicAuth
21 from catalog.packages import const
22 from catalog.pub.database.models import VnfPackageModel, VnfPkgSubscriptionModel, NsdmSubscriptionModel
23 import catalog.pub.utils.timeutil
24 from catalog.pub.utils.values import remove_none_key
25 from catalog.pub.config import config as pub_config
27 from django.db.models import Q
28 from catalog.packages.serializers.vnf_pkg_notifications import PkgChangeNotificationSerializer, \
29 PkgOnboardingNotificationSerializer
31 logger = logging.getLogger(__name__)
34 class NotificationsUtil(object):
35 def __init__(self, notification_type):
36 self.notification_type = notification_type
37 self.notifyserializer = None
39 def prepare_notification(self, **kwargs):
42 def send_notification(self):
43 notification = self.prepare_notification()
45 subscriptions_filter = {v + "__contains": notification[k] for k, v in self.filter.items()}
46 subscriptions_filter = remove_none_key(subscriptions_filter)
47 logger.debug('send_notification subscriptions_filter = %s' % subscriptions_filter)
50 for k, v in subscriptions_filter.items():
51 q1.children.append((k, v))
53 subscriptions = self.SubscriptionModel.objects.filter(q1)
54 if not subscriptions.exists():
55 logger.info("No subscriptions created for the filter %s" % notification)
57 logger.info("Start sending notifications")
58 for sub in subscriptions:
60 notification["subscriptionId"] = sub.get_subscription_id()
61 notification['_links']['subscription'] = {
62 'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP,
63 pub_config.MSB_SERVICE_PORT,
64 self.subscription_root_uri,
65 notification["subscriptionId"])
67 callbackuri = sub.callback_uri
69 auth_info = json.loads(sub.auth_info)
70 if auth_info["authType"] == const.OAUTH2_CLIENT_CREDENTIALS:
73 if self.notifyserializer:
74 serialized_data = self.notifyserializer(data=notification)
75 if not serialized_data.is_valid():
76 logger.error('Notification Data is invalid:%s.' % serialized_data.errors)
79 self.post_notification(callbackuri, notification, auth_info=json.loads(sub.auth_info))
81 self.post_notification(callbackuri, notification)
83 def post_notification(self, callbackuri, notification, auth_info=None):
86 if const.BASIC in auth_info.get("authType", ''):
87 params = auth_info.get("paramsBasic", {})
88 username = params.get("userName")
89 password = params.get("password")
90 resp = requests.post(callbackuri, data=notification, headers={'Connection': 'close'},
91 auth=HTTPBasicAuth(username, password))
92 elif const.OAUTH2_CLIENT_CREDENTIALS in auth_info.get("authType", ''):
99 resp = requests.post(callbackuri, data=notification, headers={'Connection': 'close'})
100 if resp.status_code != status.HTTP_204_NO_CONTENT:
101 logger.error("Sending notification to %s failed: %s" % (callbackuri, resp.text))
103 logger.info("Sending notification to %s successfully.", callbackuri)
105 logger.error("Post notification failed.")
106 logger.error(traceback.format_exc())
109 class PkgNotifications(NotificationsUtil):
110 def __init__(self, notification_type, vnf_pkg_id, change_type=None, operational_state=None):
111 super(PkgNotifications, self).__init__(notification_type)
114 'vnfPkgId': 'vnf_pkg_id'
116 self.vnf_pkg_id = vnf_pkg_id
117 self.change_type = change_type
118 self.operational_state = operational_state
119 self.SubscriptionModel = VnfPkgSubscriptionModel
120 self.subscription_root_uri = const.VNFPKG_SUBSCRIPTION_ROOT_URI
121 if self.notification_type == "VnfPackageChangeNotification":
122 self.notifyserializer = PkgChangeNotificationSerializer
124 self.notifyserializer = PkgOnboardingNotificationSerializer
126 def prepare_notification(self):
127 logger.info('Start to prepare Pkgnotification')
129 vnf_pkg = VnfPackageModel.objects.filter(vnfPackageId=self.vnf_pkg_id)
132 vnfd_id = vnf_pkg[0].vnfdId
133 notification_content = {
134 'id': str(uuid.uuid4()), # shall be the same if sent multiple times due to multiple subscriptions.
135 'notificationType': self.notification_type,
136 # set 'subscriptionId' after filtering for subscribers
137 'timeStamp': catalog.pub.utils.timeutil.now_time(),
138 'vnfPkgId': self.vnf_pkg_id,
142 'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP,
143 pub_config.MSB_SERVICE_PORT,
144 const.PKG_URL_PREFIX,
150 if self.notification_type == "VnfPackageChangeNotification":
151 notification_content['changeType'] = self.change_type
152 notification_content['operationalState'] = self.operational_state
154 return notification_content
157 class NsdNotifications(NotificationsUtil):
158 def __init__(self, notification_type, nsd_info_id, nsd_id, failure_details=None, operational_state=None):
159 super(NsdNotifications, self).__init__(notification_type)
161 'nsdInfoId': 'nsdInfoId',
164 self.SubscriptionModel = NsdmSubscriptionModel
165 self.subscription_root_uri = const.NSDM_SUBSCRIPTION_ROOT_URI
166 self.nsd_info_id = nsd_info_id
168 self.failure_details = failure_details
169 self.operational_state = operational_state
171 # if self.notification_type == "VnfPackageChangeNotification":
172 # self.notifyserializer = PkgChangeNotificationSerializer
174 # self.notifyserializer = PkgOnboardingNotificationSerializer
176 def prepare_notification(self):
177 logger.info('Start to prepare Nsdnotification')
179 notification_content = {
180 'id': str(uuid.uuid4()), # shall be the same if sent multiple times due to multiple subscriptions.
181 'notificationType': self.notification_type,
182 # set 'subscriptionId' after filtering for subscribers
183 'timeStamp': catalog.pub.utils.timeutil.now_time(),
184 'nsdInfoId': self.nsd_info_id,
185 'nsdId': self.nsd_id,
188 'href': 'http://%s:%s/%s/ns_descriptors/%s' % (pub_config.MSB_SERVICE_IP,
189 pub_config.MSB_SERVICE_PORT,
190 const.NSD_URL_PREFIX,
195 if self.notification_type == "NsdOnboardingFailureNotification":
196 notification_content['onboardingFailureDetails'] = self.failure_details
197 if self.notification_type == "NsdChangeNotification":
198 notification_content['nsdOperationalState'] = self.operational_state
199 return notification_content
202 class PnfNotifications(NotificationsUtil):
203 def __init__(self, notification_type, pnfd_info_id, pnfd_id, failure_details=None):
204 super(PnfNotifications, self).__init__(notification_type)
207 'pnfdInfoIds': 'pnfdInfoIds',
209 self.SubscriptionModel = NsdmSubscriptionModel
210 self.subscription_root_uri = const.NSDM_SUBSCRIPTION_ROOT_URI
211 self.pnfd_info_id = pnfd_info_id
212 self.pnfd_id = pnfd_id
213 self.failure_details = failure_details
215 # if self.notification_type == "VnfPackageChangeNotification":
216 # self.notifyserializer = PkgChangeNotificationSerializer
218 # self.notifyserializer = PkgOnboardingNotificationSerializer
220 def prepare_notification(self, *args, **kwargs):
221 logger.info('Start to prepare Pnfnotification')
222 notification_content = {
223 'id': str(uuid.uuid4()), # shall be the same if sent multiple times due to multiple subscriptions.
224 'notificationType': self.notification_type,
225 # set 'subscriptionId' after filtering for subscribers
226 'timeStamp': catalog.pub.utils.timeutil.now_time(),
227 'pnfdInfoIds': self.pnfd_info_id,
228 'pnfdId': self.pnfd_id,
231 'href': 'http://%s:%s/%s/pnf_descriptors/%s' % (pub_config.MSB_SERVICE_IP,
232 pub_config.MSB_SERVICE_PORT,
233 const.NSD_URL_PREFIX,
238 if self.notification_type == "PnfdOnboardingFailureNotification":
239 notification_content['onboardingFailureDetails'] = self.failure_details
240 return notification_content