X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=catalog%2Fpub%2Fmsapi%2Fsdc_controller.py;h=8bad8573b9badb78a520a716434e99f4394610eb;hb=4123780df595fd99883286e351a936349709ef68;hp=454f3d1a1db6a582dc88c7c6dcf9b99a72aa2c58;hpb=fddda96911bdb3ec9841ac71e764cb6eb8fa08d5;p=modeling%2Fetsicatalog.git diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py index 454f3d1..8bad857 100644 --- a/catalog/pub/msapi/sdc_controller.py +++ b/catalog/pub/msapi/sdc_controller.py @@ -7,18 +7,24 @@ import traceback import uuid from threading import Thread -from apscheduler.scheduler import Scheduler +from apscheduler.schedulers.background import BackgroundScheduler + +from catalog.pub.database.models import NSPackageModel, VnfPackageModel +from catalog.pub.exceptions import CatalogException +from catalog.packages.biz import sdc_vnf_package +from catalog.pub.utils import fileutil +from catalog.packages.biz.ns_descriptor import NsDescriptor +from catalog.pub.utils.jobutil import JobUtil +from catalog.pub.utils.values import ignore_case_get from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient -from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \ - DMAAP_MR_PORT +from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_BASE_URL from catalog.pub.msapi import sdc logger = logging.getLogger(__name__) -DMAAP_MR_BASE_URL = "http://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT) ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"] @@ -26,12 +32,12 @@ class SDCController(Thread): def __init__(self): super(SDCController, self).__init__() self.identity = IdentityClient(DMAAP_MR_BASE_URL) - self.scheduler = Scheduler(standalone=True) + self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3}) self.notification_topic = '' self.status_topic = '' self.consumer = '' - @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL) + @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL) def fetch_task(): self.fetch_notification() @@ -47,7 +53,7 @@ class SDCController(Thread): self.scheduler.start() except Exception as e: logger.error('start sdc controller failed.') - logger.error(e.message) + logger.error(str(e)) logger.error(traceback.format_exc()) def fetch_notification(self): @@ -62,7 +68,7 @@ class SDCController(Thread): process_notification(notification_callback) except Exception as e: logger.error('fetch message from dmaap failed.') - logger.error(e.message) + logger.error(str(e)) logger.error(traceback.format_exc()) @@ -184,7 +190,7 @@ def send_notification_status(status_topic, now_ms, distribution_id, artifact, is logger.error('failed to send notification status, %s messages unsent', len(stuck)) except Exception as e: logger.error('failed to send notification status.') - logger.error(e.message) + logger.error(str(e)) logger.error(traceback.format_exc()) return status @@ -192,24 +198,65 @@ def send_notification_status(status_topic, now_ms, distribution_id, artifact, is def process_notification(msg): logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources'])) - service_artifacts = msg['serviceArtifacts'] - for artifact in service_artifacts: - if artifact['artifactType'] == 'TOSCA_CSAR': - csar_id = artifact['artifactUUID'] - download_url = artifact['artifactURL'] - localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ns_csar_base = os.path.join(localhost_dir, "csars", "ns") - local_path = os.path.join(ns_csar_base, msg['distributionID']) - file_name = artifact['artifactName'] - csar_version = artifact['artifactVersion'] - sdc.download_artifacts(download_url, local_path, file_name) - # call ns package upload - data = { - 'nsPackageId': csar_id, - 'nsPackageVersion': csar_version, - 'csarName': file_name, - 'csarDir': local_path - } - jobid = uuid.uuid4() - # NsPackageParser(data, jobid).start() - logger.debug(data, jobid) + try: + ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID']) + # check if the related resources exist + resources = ns.get('resources', None) + job_array = [] + resource_threads = [] + if resources: + for resource in resources: + if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']): + logger.debug("VF [%s] is not distributed.", resource['resourceUUID']) + # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID']) + logger.info("VF [%s] begin to distribute.", resource['resourceUUID']) + csar_id = resource['resourceUUID'] + vim_ids = ignore_case_get(resource, "vimIds") + lab_vim_id = ignore_case_get(resource, "labVimId") + job_id = str(uuid.uuid4()) + job_array.append(job_id) + resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id)) + # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start() + else: + logger.debug("resource [%s] has been distributed", resource['resourceUUID']) + for resource_thread in resource_threads: + resource_thread.start() + for resource_thread in resource_threads: + resource_thread.join() + for jobID in job_array: + job_status = JobUtil.query_job_status(jobID) + if job_status[0].status == 'error': + raise CatalogException("VF resource fail to distributed.") + + service_artifacts = msg['serviceArtifacts'] + for artifact in service_artifacts: + if artifact['artifactType'] == 'TOSCA_CSAR': + csar_id = artifact['artifactUUID'] + if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']): + download_url = artifact['artifactURL'] + localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ns_csar_base = os.path.join(localhost_dir, "csars", "ns") + local_path = os.path.join(ns_csar_base, msg['distributionID']) + file_name = artifact['artifactName'] + # csar_version = artifact['artifactVersion'] + + # download csar package + local_file_name = sdc.download_artifacts(download_url, local_path, file_name) + if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"): + artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path, + "Artifacts/Deployment/OTHER/ns.csar") + if os.path.exists(artifact_vnf_file): + local_file_name = artifact_vnf_file + + data = { + 'userDefinedData': {} + } + nsd = NsDescriptor() + nsd.create(data, csar_id) + nsd.parse_nsd_and_save(csar_id, local_file_name) + logger.debug("CSAR(%s) distriuted successfully.", csar_id) + else: + logger.debug("CSAR(%s) has been distriuted", csar_id) + except CatalogException as e: + logger.error("Failed to download the resource") + logger.error(str(e))