# limitations under the License.
import ast
-from collections import Counter
import json
-from lcm.pub.database.models import SubscriptionModel
-from lcm.pub.exceptions import NSLCMException, SeeOtherException
-from lcm.pub.utils.values import ignore_case_get
-from lcm.ns import const
-from lcm.ns.enum import NOTIFICATION_TYPE, AUTH_TYPE
import logging
import requests
-from rest_framework import status
import uuid
+from collections import Counter
+from rest_framework import status
+
+from lcm.ns.enum import NOTIFICATION_TYPE, AUTH_TYPE
+from lcm.pub.database.models import SubscriptionModel
+from lcm.pub.exceptions import NSLCMException, SeeOtherException
+from lcm.pub.utils.values import ignore_case_get
+from lcm.ns import const
logger = logging.getLogger(__name__)
"lcm_opname_impacting_nscomponent",
"lcm_opoccstatus_impacting_nscomponent",
"notification_types",
- "operation_states"]
+ "operation_states"
+]
NS_FILTER_TYPE = [
"nsdIds",
"nsInstanceIds",
"vnfdIds",
"pnfdIds",
- "nsInstanceNames"]
+ "nsInstanceNames"
+]
class CreateSubscription:
self.operation_types = ignore_case_get(self.filter, "operationTypes", [])
self.operation_states = ignore_case_get(self.filter, "notificationStates", [])
self.ns_component_types = ignore_case_get(self.filter, "nsComponentTypes", [])
- self.lcm_opname_impacting_nscomponent = ignore_case_get(self.filter, "lcmOpNameImpactingNsComponent", [])
- self.lcm_opoccstatus_impacting_nscomponent = ignore_case_get(self.filter, "lcmOpOccStatusImpactingNsComponent", [])
- self.ns_filter = ignore_case_get(self.filter, "nsInstanceSubscriptionFilter", {})
+ self.lcm_opname_impacting_nscomponent = ignore_case_get(
+ self.filter,
+ "lcmOpNameImpactingNsComponent",
+ []
+ )
+ self.lcm_opoccstatus_impacting_nscomponent = ignore_case_get(
+ self.filter,
+ "lcmOpOccStatusImpactingNsComponent",
+ []
+ )
+ self.ns_filter = ignore_case_get(
+ self.filter,
+ "nsInstanceSubscriptionFilter",
+ {}
+ )
def check_callback_uri(self):
logger.debug("SubscribeNotification-post::> Sending GET request to %s" % self.callback_uri)
try:
response = requests.get(self.callback_uri, timeout=2)
if response.status_code != status.HTTP_204_NO_CONTENT:
- raise NSLCMException("callbackUri %s returns %s status code." % (self.callback_uri, response.status_code))
+ raise NSLCMException("callbackUri %s returns %s status code." % (
+ self.callback_uri,
+ response.status_code
+ ))
except Exception:
raise NSLCMException("callbackUri %s didn't return 204 status code." % self.callback_uri)
def check_filter_types(self):
logger.debug("SubscribeNotification--post::> Validating operationTypes and operationStates if exists")
- if self.operation_types and NOTIFICATION_TYPE.NSLCM_OPERATION_OCCURRENCE_NOTIFICATION not in self.notification_types:
+ occ_notification = NOTIFICATION_TYPE.NSLCM_OPERATION_OCCURRENCE_NOTIFICATION
+ if self.operation_types and occ_notification not in self.notification_types:
except_message = "If you are setting operationTypes, notificationTypes must be %s"
- raise NSLCMException(except_message % NOTIFICATION_TYPE.NSLCM_OPERATION_OCCURRENCE_NOTIFICATION)
- if self.operation_states and NOTIFICATION_TYPE.NSLCM_OPERATION_OCCURRENCE_NOTIFICATION not in self.notification_types:
+ raise NSLCMException(except_message % occ_notification)
+ if self.operation_states and occ_notification not in self.notification_types:
except_message = "If you are setting operationStates, notificationTypes must be %s"
- raise NSLCMException(except_message % NOTIFICATION_TYPE.NSLCM_OPERATION_OCCURRENCE_NOTIFICATION)
+ raise NSLCMException(except_message % occ_notification)
def check_valid_auth_info(self):
logger.debug("SubscribeNotification--post::> Validating Auth details if provided")
- if self.authentication.get("paramsBasic", {}) and AUTH_TYPE.BASIC not in self.authentication.get("authType"):
+ auth_type = self.authentication.get("authType")
+ params_basic = self.authentication.get("paramsBasic")
+ params_oauth2 = self.authentication.get("paramsOauth2ClientCredentials")
+ if params_basic and AUTH_TYPE.BASIC not in auth_type:
raise NSLCMException('Auth type should be ' + AUTH_TYPE.BASIC)
- if self.authentication.get("paramsOauth2ClientCredentials", {}) and AUTH_TYPE.OAUTH2_CLIENT_CREDENTIALS not in self.authentication.get("authType"):
+ if params_oauth2 and AUTH_TYPE.OAUTH2_CLIENT_CREDENTIALS not in auth_type:
raise NSLCMException('Auth type should be ' + AUTH_TYPE.OAUTH2_CLIENT_CREDENTIALS)
def check_filter_exists(self, sub):
# Check the notificationTypes, operationTypes, operationStates
for filter_type in FILTER_TYPE:
- if not is_filter_type_equal(getattr(self, filter_type), ast.literal_eval(getattr(sub, filter_type))):
+ if not is_filter_type_equal(
+ getattr(self, filter_type),
+ ast.literal_eval(getattr(sub, filter_type))
+ ):
return False
# If all the above types are same then check ns instance filters
ns_filter = json.loads(sub.ns_instance_filter)
for ns_filter_type in NS_FILTER_TYPE:
- if not is_filter_type_equal(self.ns_filter.get(ns_filter_type, []), ns_filter.get(ns_filter_type, [])):
+ if not is_filter_type_equal(
+ self.ns_filter.get(ns_filter_type, []),
+ ns_filter.get(ns_filter_type, [])
+ ):
return False
return True