class NotificationsUtil(object):
+ """
+ Util for notifications
+ """
+
def __init__(self, notification_type):
self.notification_type = notification_type
self.notifyserializer = None
pass
def send_notification(self):
+ """
+ Send notification
+ :return:
+ """
notification = self.prepare_notification()
subscriptions_filter = {v + "__contains": notification[k] for k, v in self.filter.items()}
self.post_notification(callbackuri, notification)
def post_notification(self, callbackuri, notification, auth_info=None):
+ """
+ Post notification
+ :param callbackuri:
+ :param notification:
+ :param auth_info:
+ :return:
+ """
try:
if auth_info:
if const.BASIC in auth_info.get("authType", ''):
class PkgNotifications(NotificationsUtil):
+ """
+ Notification Utils for VNF pckages
+ """
+
def __init__(self, notification_type, vnf_pkg_id, change_type=None, operational_state=None):
super(PkgNotifications, self).__init__(notification_type)
self.filter = {
self.notifyserializer = PkgOnboardingNotificationSerializer
def prepare_notification(self):
+ """
+ Prepare notification
+ :return:
+ """
logger.info('Start to prepare Pkgnotification')
vnf_pkg = VnfPackageModel.objects.filter(vnfPackageId=self.vnf_pkg_id)
class NsdNotifications(NotificationsUtil):
+ """
+ Notification Util for NS packages
+ """
+
def __init__(self, notification_type, nsd_info_id, nsd_id, failure_details=None, operational_state=None):
super(NsdNotifications, self).__init__(notification_type)
self.filter = {
# self.notifyserializer = PkgOnboardingNotificationSerializer
def prepare_notification(self):
+ """
+ Prepare notification
+ :return:
+ """
logger.info('Start to prepare Nsdnotification')
notification_content = {
class PnfNotifications(NotificationsUtil):
+ """
+ Notification util for PNF package
+ """
def __init__(self, notification_type, pnfd_info_id, pnfd_id, failure_details=None):
super(PnfNotifications, self).__init__(notification_type)
self.filter = {
# self.notifyserializer = PkgOnboardingNotificationSerializer
def prepare_notification(self, *args, **kwargs):
+ """
+ Prepare notification
+ :param args:
+ :param kwargs:
+ :return:
+ """
logger.info('Start to prepare Pnfnotification')
notification_content = {
'id': str(uuid.uuid4()), # shall be the same if sent multiple times due to multiple subscriptions.
class NsDescriptor(object):
+ """
+ NS Package
+ """
def __init__(self):
pass
def create(self, data, id=None):
+ """
+ Create NS package
+ :param data:
+ :param id:
+ :return:
+ """
logger.info('Start to create a NSD...')
user_defined_data = ignore_case_get(data, 'userDefinedData', {})
data = {
return data
def update(self, data, nsd_info_id):
+ """
+ Update NS package
+ :param data:
+ :param nsd_info_id:
+ :return:
+ """
usageState = PKG_STATUS.IN_USE if data["usageState"] else PKG_STATUS.NOT_IN_USE
NSPackageModel.objects.filter(nsPackageId=nsd_info_id).update(usageState=usageState)
def query_multiple(self, nsdId=None):
+ """
+ Query NS package list
+ :param nsdId:
+ :return:
+ """
if nsdId:
ns_pkgs = NSPackageModel.objects.filter(nsdId=nsdId)
else:
return response_data
def query_single(self, nsd_info_id):
+ """
+ Query NS package by id
+ :param nsd_info_id:
+ :return:
+ """
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
if not ns_pkgs.exists():
logger.error('NSD(%s) does not exist.' % nsd_info_id)
return self.fill_resp_data(ns_pkgs[0])
def delete_single(self, nsd_info_id):
+ """
+ Delete NSD package by id
+ :param nsd_info_id:
+ :return:
+ """
logger.info('Start to delete NSD(%s)...' % nsd_info_id)
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
if not ns_pkgs.exists():
logger.info('NSD(%s) has been deleted.' % nsd_info_id)
def upload(self, nsd_info_id, remote_file):
+ """
+ Upload NS package file
+ :param nsd_info_id:
+ :param remote_file:
+ :return:
+ """
logger.info('Start to upload NSD(%s)...' % nsd_info_id)
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
if not ns_pkgs.exists():
return local_file_name
def download(self, nsd_info_id, file_range):
+ """
+ Download NS package file
+ :param nsd_info_id:
+ :param file_range:
+ :return:
+ """
logger.info('Start to download NSD(%s)...' % nsd_info_id)
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
if not ns_pkgs.exists():
return read(local_file_path, start, end)
def parse_nsd_and_save(self, nsd_info_id, local_file_name):
+ """
+ Parse NSD and save the information
+ :param nsd_info_id:
+ :param local_file_name:
+ :return:
+ """
logger.info('Start to process NSD(%s)...' % nsd_info_id)
ns_pkgs = NSPackageModel.objects.filter(nsPackageId=nsd_info_id)
ns_pkgs.update(onboardingState=const.PKG_STATUS.PROCESSING)
logger.info('NSD(%s) has been processed.' % nsd_info_id)
def fill_resp_data(self, ns_pkg):
+ """
+ File response data
+ :param ns_pkg:
+ :return:
+ """
data = {
'id': ns_pkg.nsPackageId,
'nsdId': ns_pkg.nsdId,
def send_notification(type, nsd_info_id, nsd_id=None, failure_details=None, operational_state=None):
+ """
+ Send notification
+ :param type:
+ :param nsd_info_id:
+ :param nsd_id:
+ :param failure_details:
+ :param operational_state:
+ :return:
+ """
notify = NsdNotifications(type, nsd_info_id, nsd_id,
failure_details=failure_details,
operational_state=operational_state)
class PnfDescriptor(object):
+ """
+ PNF package management
+ """
def __init__(self):
pass
def create(self, data):
+ """
+ Create a PNF package
+ :param data:
+ :return:
+ """
logger.info('Start to create a PNFD...')
user_defined_data = ignore_case_get(data, 'userDefinedData', {})
data = {
return data
def query_multiple(self, request):
+ """
+ Query PNF packages
+ :param request:
+ :return:
+ """
pnfdId = request.query_params.get('pnfdId')
if pnfdId:
pnf_pkgs = PnfPackageModel.objects.filter(pnfdId=pnfdId)
return response_data
def query_single(self, pnfd_info_id):
+ """
+ Query PNF package by id
+ :param pnfd_info_id:
+ :return:
+ """
pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
if not pnf_pkgs.exists():
logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
return self.fill_response_data(pnf_pkgs[0])
def upload(self, remote_file, pnfd_info_id):
+ """
+ Upload PNF package file
+ :param remote_file:
+ :param pnfd_info_id:
+ :return:
+ """
logger.info('Start to upload PNFD(%s)...' % pnfd_info_id)
pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
if not pnf_pkgs.exists():
return local_file_name
def delete_single(self, pnfd_info_id):
+ """
+ Delete PNF package by id
+ :param pnfd_info_id:
+ :return:
+ """
logger.info('Start to delete PNFD(%s)...' % pnfd_info_id)
pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
if not pnf_pkgs.exists():
logger.debug('PNFD(%s) has been deleted.' % pnfd_info_id)
def download(self, pnfd_info_id):
+ """
+ Download PNF package file by id
+ :param pnfd_info_id:
+ :return:
+ """
logger.info('Start to download PNFD(%s)...' % pnfd_info_id)
pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
if not pnf_pkgs.exists():
return read(local_file_path, start, end)
def parse_pnfd_and_save(self, pnfd_info_id, local_file_name):
+ """
+ Parse PNFD and save the information
+ :param pnfd_info_id:
+ :param local_file_name:
+ :return:
+ """
logger.info('Start to process PNFD(%s)...' % pnfd_info_id)
pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
pnf_pkgs.update(onboardingState=PKG_STATUS.PROCESSING)
logger.info('PNFD(%s) has been processed.' % pnfd_info_id)
def fill_response_data(self, pnf_pkg):
+ """
+ Fill response data
+ :param pnf_pkg:
+ :return:
+ """
data = {
'id': pnf_pkg.pnfPackageId,
'pnfdId': pnf_pkg.pnfdId,
return data
def handle_upload_failed(self, pnf_pkg_id):
+ """
+ Faild process
+ :param pnf_pkg_id:
+ :return:
+ """
pnf_pkg = PnfPackageModel.objects.filter(pnfPackageId=pnf_pkg_id)
pnf_pkg.update(onboardingState=PKG_STATUS.CREATED)
def parse_pnfd(self, csar_id, inputs):
+ """
+ Parse PNFD
+ :param csar_id:
+ :param inputs:
+ :return:
+ """
try:
pnf_pkg = PnfPackageModel.objects.filter(pnfPackageId=csar_id)
if not pnf_pkg:
def send_notification(type, pnfd_info_id, pnfd_id=None, failure_details=None):
+ """
+ Send notification
+ :param type:
+ :param pnfd_info_id:
+ :param pnfd_id:
+ :param failure_details:
+ :return:
+ """
notify = PnfNotifications(type, pnfd_info_id, pnfd_id, failure_details=failure_details)
notify.send_notification()
def ns_on_distribute(csar_id):
+ """
+ Get NS pckage from SDC
+ :param csar_id:
+ :return:
+ """
ret = None
try:
ret = NsPackage().on_distribute(csar_id)
def ns_delete_csar(csar_id):
+ """
+ Delete NS package
+ :param csar_id:
+ :return:
+ """
ret = None
try:
ret = NsPackage().delete_csar(csar_id)
def ns_get_csars():
+ """
+ Get NS packages
+ :return:
+ """
ret = None
try:
ret = NsPackage().get_csars()
def ns_get_csar(csar_id):
+ """
+ Get NS package by id
+ :param csar_id:
+ :return:
+ """
ret = None
try:
ret = NsPackage().get_csar(csar_id)
def parse_nsd(csar_id, inputs):
+ """
+ Parse NSD
+ :param csar_id:
+ :param inputs:
+ :return:
+ """
ret = None
try:
ns_pkg = NSPackageModel.objects.filter(nsPackageId=csar_id)
return [0, "CSAR(%s) distributed successfully." % csar_id]
def delete_csar(self, csar_id):
+ """
+ Delete NS package by id
+ :param csar_id:
+ :return:
+ """
nsd = NsDescriptor()
nsd.delete_single(csar_id)
return [0, "Delete CSAR(%s) successfully." % csar_id]
def get_csars(self):
+ """
+ Get ns packages
+ :return:
+ """
csars = []
nss = NSPackageModel.objects.filter()
for ns in nss:
return [0, csars]
def get_csar(self, csar_id):
+ """
+ Get NS package by id
+ :param csar_id:
+ :return:
+ """
package_info = {}
csars = NSPackageModel.objects.filter(nsPackageId=csar_id)
if csars:
pass
def on_distribute(self, csar_id):
+ """
+ Get service packge from SDC and process
+ :param csar_id:
+ :return:
+ """
if ServicePackageModel.objects.filter(servicePackageId=csar_id):
+ err_msg = "Service CSAR(%s) already exists." % csar_id
+ logger.warn(err_msg)
raise PackageHasExistsException("Service CSAR(%s) already exists." % csar_id)
try:
raise e
def delete_csar(self, csar_id):
+ """
+ Delete service package
+ :param csar_id:
+ :return:
+ """
serviced = ServiceDescriptor()
serviced.delete_single(csar_id)
def get_csars(self):
+ """
+ Get service packages from DB
+ :return:
+ """
csars = []
packages = ServicePackageModel.objects.filter()
for package in packages:
return csars
def get_csar(self, csar_id):
+ """
+ Get a service package by id
+ :param csar_id:
+ :return:
+ """
package_info = {}
csars = ServicePackageModel.objects.filter(servicePackageId=csar_id)
if csars:
return {"csarId": csar_id, "packageInfo": package_info}
def parse_serviced(self, csar_id, inputs):
+ """
+ Parse service package
+ :param csar_id:
+ :param inputs:
+ :return:
+ """
service_pkg = ServicePackageModel.objects.filter(servicePackageId=csar_id)
if not service_pkg:
raise PackageNotFoundException("Service CSAR(%s) does not exist." % csar_id)
def nf_get_csars():
+ """
+ Get NF packages
+ :return:
+ """
ret = None
try:
ret = NfPackage().get_csars()
def nf_get_csar(csar_id):
+ """
+ Get NF package by id
+ :param csar_id:
+ :return:
+ """
ret = None
try:
ret = NfPackage().get_csar(csar_id)
def parse_vnfd(csar_id, inputs):
+ """
+ Parse VNFD
+ :param csar_id:
+ :param inputs:
+ :return:
+ """
ret = None
try:
nf_pkg = VnfPackageModel.objects.filter(vnfPackageId=csar_id)
vnf_provider = vnfd["vnf"]["properties"].get("provider", "")
vnf_software_version = vnfd["vnf"]["properties"].get("software_version", "")
vnfd_product_name = vnfd["vnf"]["properties"].get("product_name", "")
+ # Update VNF package Model to DB
VnfPackageModel(
vnfPackageId=self.csar_id,
vnfdId=vnfd_id,
pass
def create(self, data, csar_id=None):
+ """
+ Create a Service package
+ :param data:
+ :param csar_id:
+ :return:
+ """
logger.info('Start to create a ServiceD...')
user_defined_data = ignore_case_get(data, 'userDefinedData', {})
data = {
return data
def parse_serviced_and_save(self, serviced_info_id, local_file_name):
+ """
+ Parse service package and save information to DB
+ :param serviced_info_id:
+ :param local_file_name:
+ :return:
+ """
logger.info('Start to process ServiceD(%s)...' % serviced_info_id)
service_pkgs = ServicePackageModel.objects.filter(servicePackageId=serviced_info_id)
service_pkgs.update(onboardingState=PKG_STATUS.PROCESSING)
logger.info('ServiceD(%s) has been processed.' % serviced_info_id)
def delete_single(self, serviced_info_id):
+ """
+ Delete a service package by given id
+ :param serviced_info_id:
+ :return:
+ """
logger.info('Start to delete ServiceD(%s)...' % serviced_info_id)
service_pkgs = ServicePackageModel.objects.filter(servicePackageId=serviced_info_id)
if not service_pkgs.exists():
logger.warn('ServiceD(%s) not found.' % serviced_info_id)
raise PackageNotFoundException("Service package[%s] not Found." % serviced_info_id)
service_pkgs.delete()
+
+ # Delete package dir
service_pkg_path = os.path.join(CATALOG_ROOT_PATH, serviced_info_id)
fileutil.delete_dirs(service_pkg_path)
logger.info('ServiceD(%s) has been deleted.' % serviced_info_id)
class VnfPackage(object):
+ """
+ The class for VNF package management
+ """
def __init__(self):
pass
def create_vnf_pkg(self, data):
+ """
+ Create a VNF package
+ :param data: user defined data
+ :return: VNF package info
+ """
user_defined_data = ignore_case_get(data, "userDefinedData", {})
vnf_pkg_id = str(uuid.uuid4())
VnfPackageModel.objects.create(
return data
def query_multiple(self):
+ """
+ Query the list of VNF package
+ :return: The list of VNF pakcage
+ """
pkgs_info = []
nf_pkgs = VnfPackageModel.objects.filter()
for nf_pkg in nf_pkgs:
return pkgs_info
def query_single(self, vnf_pkg_id):
+ """
+ Query a single VNF package by given id
+ :param vnf_pkg_id: The id of VNF package
+ :return: VNF pckage info
+ """
nf_pkg = VnfPackageModel.objects.filter(vnfPackageId=vnf_pkg_id)
if not nf_pkg.exists():
logger.error('VNF package(%s) does not exist.' % vnf_pkg_id)
return fill_response_data(nf_pkg[0])
def delete_vnf_pkg(self, vnf_pkg_id):
+ """
+ Delete a VNF package by give id
+ :param vnf_pkg_id: The id of VNF package
+ :return:
+ """
vnf_pkg = VnfPackageModel.objects.filter(vnfPackageId=vnf_pkg_id)
if not vnf_pkg.exists():
logger.debug('VNF package(%s) has been deleted.' % vnf_pkg_id)
logger.info('VNF package(%s) has been deleted.' % vnf_pkg_id)
def upload(self, vnf_pkg_id, remote_file):
+ """
+ Update VNF pckage file for given id
+ :param vnf_pkg_id: The id of VNF package
+ :param remote_file: VNF package file
+ :return:
+ """
logger.info('Start to upload VNF package(%s)...' % vnf_pkg_id)
vnf_pkg = VnfPackageModel.objects.filter(vnfPackageId=vnf_pkg_id)
# if vnf_pkg[0].onboardingState != PKG_STATUS.CREATED:
return read(local_file_path, start, end)
def download_vnfd(self, vnf_pkg_id):
+ """
+ Download VNFD for given id
+ :param vnf_pkg_id: The id of VNF package
+ :return: VNFD
+ """
logger.info('Start to download VNFD of VNF package(%s)...' % vnf_pkg_id)
nf_pkg = VnfPackageModel.objects.filter(vnfPackageId=vnf_pkg_id)
if not nf_pkg.exists():
class VnfPkgUploadThread(threading.Thread):
+ """
+ The Thread for upload VNF pckage
+ """
+
def __init__(self, data, vnf_pkg_id):
threading.Thread.__init__(self)
self.vnf_pkg_id = vnf_pkg_id
class FetchVnfPkgArtifact(object):
+ """
+ Fetch the artifact of VNF package
+ """
def fetch(self, vnfPkgId, artifactPath):
+ """
+ Fetch artifact by given vnf package id and the path of artifact
+ :param vnfPkgId:
+ :param artifactPath:
+ :return:
+ """
logger.debug("FetchVnfPkgArtifact--get--single--artifact--biz::>"
"ID: %s path: %s" % (vnfPkgId, artifactPath))
vnf_pkg = VnfPackageModel.objects.filter(vnfPackageId=vnfPkgId)
class CreateSubscription(object):
+ """
+ Create subscription info
+ """
def __init__(self, data):
self.data = data
self.filter = ignore_case_get(self.data, "filter", {})
ignore_case_get(self.filter, "vnfProductsFromProviders", [])
def check_callbackuri_connection(self):
+ """
+ Check if the callback uri can access
+ :return:
+ """
logger.debug("SubscribeNotification-post::> Sending GET request "
"to %s" % self.callback_uri)
try:
)
def do_biz(self):
+ """
+ Do business
+ :return:
+ """
self.subscription_id = str(uuid.uuid4())
self.check_valid_auth_info()
self.check_callbackuri_connection()
return subscription.toDict()
def check_valid_auth_info(self):
+ """
+ Check if the Auth info is valid
+ :return:
+ """
logger.debug("SubscribeNotification--post::> Validating Auth "
"details if provided")
if self.authentication.get("paramsBasic", {}) and const.BASIC not in self.authentication.get("authType"):
return True
def save_db(self):
+ """
+ Save the subscription info to DB
+ :return:
+ """
logger.debug("SubscribeNotification--post::> Saving the subscription "
"%s to the database" % self.subscription_id)
links = {
class QuerySubscription(object):
+ """
+ The class for query subscription
+ """
def query_multi_subscriptions(self, params):
+ """
+ Query subscriptions
+ :param params:
+ :return:
+ """
query_data = {}
logger.debug("QuerySubscription--get--multi--subscriptions--biz::> Check "
"for filter in query params %s" % params)
return [subscription.toDict() for subscription in subscriptions]
def query_single_subscription(self, subscription_id):
+ """
+ Query subscription by id
+ :param subscription_id:
+ :return:
+ """
logger.debug("QuerySingleSubscriptions--get--single--subscription--biz::> "
"ID: %s" % subscription_id)
class TerminateSubscription(object):
+ """
+ The class to terminate the subscription
+ """
def terminate(self, subscription_id):
logger.debug("TerminateSubscriptions--delete--biz::> "
"ID: %s" % subscription_id)
def get_artifacts(asset_type):
+ """
+ Get artifacts by given asset type
+ :param asset_type:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}"
resource = resource.format(assetType=asset_type)
ret = call_sdc(resource, "GET")
def get_artifact(asset_type, csar_id):
+ """
+ Get artifact by given asset type and csar id
+ :param asset_type:
+ :param csar_id:
+ :return:
+ """
artifacts = get_artifacts(asset_type)
for artifact in artifacts:
if artifact["uuid"] == csar_id:
def get_asset(asset_type, uuid):
+ """
+ Get asset by given type and UUID
+ :param asset_type:
+ :param uuid:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}/{uuid}/metadata".format(assetType=asset_type, uuid=uuid)
ret = call_sdc(resource, "GET")
if ret[0] != 0:
def delete_artifact(asset_type, asset_id, artifact_id):
+ """
+ Delete artifact by conditions from SDC
+ :param asset_type:
+ :param asset_id:
+ :param artifact_id:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}/{uuid}/artifacts/{artifactUUID}"
resource = resource.format(assetType=asset_type, uuid=asset_id, artifactUUID=artifact_id)
ret = call_sdc(resource, "DELETE")
def download_artifacts(download_url, local_path, file_name):
+ """
+ Downlaod artifacts from SDC
+ :param download_url:
+ :param local_path:
+ :param file_name:
+ :return:
+ """
additional_headers = {
'X-ECOMP-InstanceID': 'VFC',
'accept': 'application/octet-stream'
def create_consumer(name, salt, password):
+ """
+ Create a consumer to access the SDC
+ :param name:
+ :param salt:
+ :param password:
+ :return:
+ """
req_data = {
'consumerName': name,
'consumerSalt': salt,
def register_for_topics(key):
+ """
+ Register a topics of SDC
+ :param key:
+ :return:
+ """
req_data = {
'apiPublicKey': key,
'distrEnvName': 'AUTO',