X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=catalog%2Fpub%2Fmsapi%2Fsdc_controller.py;h=8bad8573b9badb78a520a716434e99f4394610eb;hb=4123780df595fd99883286e351a936349709ef68;hp=74a5ae847fbf0a5a473cc6e719273eea2b720670;hpb=acc879458b00c2d78d859ca79d683c1ec79ff554;p=modeling%2Fetsicatalog.git diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py index 74a5ae8..8bad857 100644 --- a/catalog/pub/msapi/sdc_controller.py +++ b/catalog/pub/msapi/sdc_controller.py @@ -7,7 +7,15 @@ import traceback import uuid from threading import Thread -from apscheduler.scheduler import Scheduler +from apscheduler.schedulers.background import BackgroundScheduler + +from catalog.pub.database.models import NSPackageModel, VnfPackageModel +from catalog.pub.exceptions import CatalogException +from catalog.packages.biz import sdc_vnf_package +from catalog.pub.utils import fileutil +from catalog.packages.biz.ns_descriptor import NsDescriptor +from catalog.pub.utils.jobutil import JobUtil +from catalog.pub.utils.values import ignore_case_get from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient @@ -24,12 +32,12 @@ class SDCController(Thread): def __init__(self): super(SDCController, self).__init__() self.identity = IdentityClient(DMAAP_MR_BASE_URL) - self.scheduler = Scheduler(standalone=True) + self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3}) self.notification_topic = '' self.status_topic = '' self.consumer = '' - @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL) + @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL) def fetch_task(): self.fetch_notification() @@ -190,24 +198,65 @@ def send_notification_status(status_topic, now_ms, distribution_id, artifact, is def process_notification(msg): logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources'])) - service_artifacts = msg['serviceArtifacts'] - for artifact in service_artifacts: - if artifact['artifactType'] == 'TOSCA_CSAR': - csar_id = artifact['artifactUUID'] - download_url = artifact['artifactURL'] - localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ns_csar_base = os.path.join(localhost_dir, "csars", "ns") - local_path = os.path.join(ns_csar_base, msg['distributionID']) - file_name = artifact['artifactName'] - csar_version = artifact['artifactVersion'] - sdc.download_artifacts(download_url, local_path, file_name) - # call ns package upload - data = { - 'nsPackageId': csar_id, - 'nsPackageVersion': csar_version, - 'csarName': file_name, - 'csarDir': local_path - } - jobid = uuid.uuid4() - # NsPackageParser(data, jobid).start() - logger.debug(data, jobid) + try: + ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID']) + # check if the related resources exist + resources = ns.get('resources', None) + job_array = [] + resource_threads = [] + if resources: + for resource in resources: + if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']): + logger.debug("VF [%s] is not distributed.", resource['resourceUUID']) + # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID']) + logger.info("VF [%s] begin to distribute.", resource['resourceUUID']) + csar_id = resource['resourceUUID'] + vim_ids = ignore_case_get(resource, "vimIds") + lab_vim_id = ignore_case_get(resource, "labVimId") + job_id = str(uuid.uuid4()) + job_array.append(job_id) + resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id)) + # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start() + else: + logger.debug("resource [%s] has been distributed", resource['resourceUUID']) + for resource_thread in resource_threads: + resource_thread.start() + for resource_thread in resource_threads: + resource_thread.join() + for jobID in job_array: + job_status = JobUtil.query_job_status(jobID) + if job_status[0].status == 'error': + raise CatalogException("VF resource fail to distributed.") + + service_artifacts = msg['serviceArtifacts'] + for artifact in service_artifacts: + if artifact['artifactType'] == 'TOSCA_CSAR': + csar_id = artifact['artifactUUID'] + if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']): + download_url = artifact['artifactURL'] + localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ns_csar_base = os.path.join(localhost_dir, "csars", "ns") + local_path = os.path.join(ns_csar_base, msg['distributionID']) + file_name = artifact['artifactName'] + # csar_version = artifact['artifactVersion'] + + # download csar package + local_file_name = sdc.download_artifacts(download_url, local_path, file_name) + if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"): + artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path, + "Artifacts/Deployment/OTHER/ns.csar") + if os.path.exists(artifact_vnf_file): + local_file_name = artifact_vnf_file + + data = { + 'userDefinedData': {} + } + nsd = NsDescriptor() + nsd.create(data, csar_id) + nsd.parse_nsd_and_save(csar_id, local_file_name) + logger.debug("CSAR(%s) distriuted successfully.", csar_id) + else: + logger.debug("CSAR(%s) has been distriuted", csar_id) + except CatalogException as e: + logger.error("Failed to download the resource") + logger.error(str(e))