1 # Copyright (c) 2019, ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from collections import Counter
22 from rest_framework import status
24 from lcm.ns.enum import NOTIFICATION_TYPE, AUTH_TYPE
25 from lcm.pub.database.models import SubscriptionModel
26 from lcm.pub.exceptions import NSLCMException, SeeOtherException
27 from lcm.pub.utils.values import ignore_case_get
28 from lcm.ns import const
30 logger = logging.getLogger(__name__)
33 def is_filter_type_equal(new_filter, existing_filter):
34 return Counter(new_filter) == Counter(existing_filter)
40 "lcm_opname_impacting_nscomponent",
41 "lcm_opoccstatus_impacting_nscomponent",
55 class CreateSubscription:
57 Subscription Create process
60 def __init__(self, data):
62 self.filter = ignore_case_get(self.data, "filter", {})
63 self.callback_uri = ignore_case_get(self.data, "callbackUri")
64 self.authentication = ignore_case_get(self.data, "authentication", {})
65 self.notification_types = ignore_case_get(self.filter, "notificationTypes", [])
66 self.operation_types = ignore_case_get(self.filter, "operationTypes", [])
67 self.operation_states = ignore_case_get(self.filter, "notificationStates", [])
68 self.ns_component_types = ignore_case_get(self.filter, "nsComponentTypes", [])
69 self.lcm_opname_impacting_nscomponent = ignore_case_get(
71 "lcmOpNameImpactingNsComponent",
74 self.lcm_opoccstatus_impacting_nscomponent = ignore_case_get(
76 "lcmOpOccStatusImpactingNsComponent",
79 self.ns_filter = ignore_case_get(
81 "nsInstanceSubscriptionFilter",
85 def check_callback_uri(self):
87 Check if the callback Uri can access
90 logger.debug("SubscribeNotification-post::> Sending GET request to %s" % self.callback_uri)
92 response = requests.get(self.callback_uri, timeout=2)
93 if response.status_code != status.HTTP_204_NO_CONTENT:
94 raise NSLCMException("callbackUri %s returns %s status code." % (
99 raise NSLCMException("callbackUri %s didn't return 204 status code." % self.callback_uri)
102 self.subscription_id = str(uuid.uuid4())
103 # self.check_callback_uri()
104 self.check_valid_auth_info()
105 self.check_filter_types()
108 subscription = SubscriptionModel.objects.get(subscription_id=self.subscription_id)
111 def check_filter_types(self):
113 Validate operationTypes and operationStates if exists
116 logger.debug("SubscribeNotification--post::> Validating operationTypes and operationStates if exists")
117 occ_notification = NOTIFICATION_TYPE.NSLCM_OPERATION_OCCURRENCE_NOTIFICATION
118 if self.operation_types and occ_notification not in self.notification_types:
119 except_message = "If you are setting operationTypes, notificationTypes must be %s"
120 raise NSLCMException(except_message % occ_notification)
121 if self.operation_states and occ_notification not in self.notification_types:
122 except_message = "If you are setting operationStates, notificationTypes must be %s"
123 raise NSLCMException(except_message % occ_notification)
125 def check_valid_auth_info(self):
127 Validate Auth details if provided
130 logger.debug("SubscribeNotification--post::> Validating Auth details if provided")
131 auth_type = self.authentication.get("authType")
132 params_basic = self.authentication.get("paramsBasic")
133 params_oauth2 = self.authentication.get("paramsOauth2ClientCredentials")
134 if params_basic and AUTH_TYPE.BASIC not in auth_type:
135 raise NSLCMException('Auth type should be ' + AUTH_TYPE.BASIC)
136 if params_oauth2 and AUTH_TYPE.OAUTH2_CLIENT_CREDENTIALS not in auth_type:
137 raise NSLCMException('Auth type should be ' + AUTH_TYPE.OAUTH2_CLIENT_CREDENTIALS)
139 def check_filter_exists(self, sub):
140 # Check the notificationTypes, operationTypes, operationStates
141 for filter_type in FILTER_TYPE:
142 if not is_filter_type_equal(
143 getattr(self, filter_type),
144 ast.literal_eval(getattr(sub, filter_type))
147 # If all the above types are same then check ns instance filters
148 ns_filter = json.loads(sub.ns_instance_filter)
149 for ns_filter_type in NS_FILTER_TYPE:
150 if not is_filter_type_equal(
151 self.ns_filter.get(ns_filter_type, []),
152 ns_filter.get(ns_filter_type, [])
157 def check_valid(self):
159 Check DB if callbackUri already exists
162 logger.debug("SubscribeNotification--post::> Checking DB if callbackUri already exists")
163 subscriptions = SubscriptionModel.objects.filter(callback_uri=self.callback_uri)
164 if not subscriptions.exists():
166 for subscription in subscriptions:
167 if self.check_filter_exists(subscription):
168 raise SeeOtherException("Already Subscription exists with the same callbackUri and filter")
173 Save the subscription(%s) to the database
177 "SubscribeNotification--post::> Saving the subscription(%s) to the database" % self.subscription_id)
180 "href": const.SUBSCRIPTION_ROOT_URI % self.subscription_id
183 SubscriptionModel.objects.create(
184 subscription_id=self.subscription_id,
185 callback_uri=self.callback_uri,
186 auth_info=self.authentication,
187 notification_types=json.dumps(self.notification_types),
188 operation_types=json.dumps(self.operation_types),
189 operation_states=json.dumps(self.operation_states),
190 ns_instance_filter=json.dumps(self.ns_filter),
191 ns_component_types=json.dumps(self.ns_component_types),
192 lcm_opname_impacting_nscomponent=json.dumps(self.lcm_opname_impacting_nscomponent),
193 lcm_opoccstatus_impacting_nscomponent=json.dumps(self.lcm_opoccstatus_impacting_nscomponent),
194 links=json.dumps(links))
195 logger.debug('Create Subscription[%s] success', self.subscription_id)