class NsDescriptor(object):
+ """
+ NS Package
+ """
def __init__(self):
pass
def create(self, data, id=None):
+ """
+ Create NS package
+ :param data:
+ :param id:
+ :return:
+ """
logger.info('Start to create a NSD...')
user_defined_data = ignore_case_get(data, 'userDefinedData', {})
data = {
return data
def update(self, data, nsd_info_id):
+ """
+ Update NS package
+ :param data:
+ :param nsd_info_id:
+ :return:
+ """
usageState = PKG_STATUS.IN_USE if data["usageState"] else PKG_STATUS.NOT_IN_USE
NSPackageModel.objects.filter(nsPackageId=nsd_info_id).update(usageState=usageState)
def query_multiple(self, nsdId=None):
+ """
+ Query NS package list
+ :param nsdId:
+ :return:
+ """
if nsdId:
ns_pkgs = NSPackageModel.objects.filter(nsdId=nsdId)
else:
return response_data
def query_single(self, nsd_info_id):
+ """
+ Query NS package by id
+ :param nsd_info_id:
+ :return:
+ """
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
if not ns_pkgs.exists():
logger.error('NSD(%s) does not exist.' % nsd_info_id)
return self.fill_resp_data(ns_pkgs[0])
def delete_single(self, nsd_info_id):
+ """
+ Delete NSD package by id
+ :param nsd_info_id:
+ :return:
+ """
logger.info('Start to delete NSD(%s)...' % nsd_info_id)
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
if not ns_pkgs.exists():
logger.info('NSD(%s) has been deleted.' % nsd_info_id)
def upload(self, nsd_info_id, remote_file):
+ """
+ Upload NS package file
+ :param nsd_info_id:
+ :param remote_file:
+ :return:
+ """
logger.info('Start to upload NSD(%s)...' % nsd_info_id)
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
if not ns_pkgs.exists():
return local_file_name
def download(self, nsd_info_id, file_range):
+ """
+ Download NS package file
+ :param nsd_info_id:
+ :param file_range:
+ :return:
+ """
logger.info('Start to download NSD(%s)...' % nsd_info_id)
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
if not ns_pkgs.exists():
return read(local_file_path, start, end)
def parse_nsd_and_save(self, nsd_info_id, local_file_name):
+ """
+ Parse NSD and save the information
+ :param nsd_info_id:
+ :param local_file_name:
+ :return:
+ """
logger.info('Start to process NSD(%s)...' % nsd_info_id)
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
ns_pkgs.update(onboardingState=const.PKG_STATUS.PROCESSING)
logger.info('NSD(%s) has been processed.' % nsd_info_id)
def fill_resp_data(self, ns_pkg):
+ """
+ File response data
+ :param ns_pkg:
+ :return:
+ """
data = {
'id': ns_pkg.nsPackageId,
'nsdId': ns_pkg.nsdId,
def send_notification(type, nsd_info_id, nsd_id=None, failure_details=None, operational_state=None):
+ """
+ Send notification
+ :param type:
+ :param nsd_info_id:
+ :param nsd_id:
+ :param failure_details:
+ :param operational_state:
+ :return:
+ """
notify = NsdNotifications(type, nsd_info_id, nsd_id,
failure_details=failure_details,
operational_state=operational_state)