From: Mars chen Date: Fri, 18 Sep 2020 02:26:56 +0000 (+0800) Subject: Process notifications for SDC and stores NS and related resources X-Git-Tag: 1.0.8~4 X-Git-Url: https://gerrit.onap.org/r/gitweb?p=modeling%2Fetsicatalog.git;a=commitdiff_plain;h=4123780df595fd99883286e351a936349709ef68 Process notifications for SDC and stores NS and related resources Issue-ID: MODELING-335 Change-Id: Ie8836c865d21fb4695b85f07f1098d0d4617ac0c Signed-off-by: Mars chen --- diff --git a/catalog/pub/Dmaap_lib/dmaap/publisher.py b/catalog/pub/Dmaap_lib/dmaap/publisher.py index 7ebbca0..23a2351 100644 --- a/catalog/pub/Dmaap_lib/dmaap/publisher.py +++ b/catalog/pub/Dmaap_lib/dmaap/publisher.py @@ -21,8 +21,7 @@ import time from hashlib import sha1 import requests -from apscheduler.scheduler import Scheduler - +from apscheduler.schedulers.background import BackgroundScheduler from catalog.pub.Dmaap_lib.pub.exceptions import DmaapClientException logger = logging.getLogger(__name__) @@ -40,9 +39,10 @@ class BatchPublisherClient: self.pending = [] self.closed = False self.dont_send_until_ms = 0 - self.scheduler = Scheduler(standalone=False) + self.scheduler = BackgroundScheduler() - @self.scheduler.interval_schedule(second=1) + # @self.scheduler.interval_schedule(second=1) + @self.scheduler.scheduled_job(second=1) def crawl_job(): self.send_message(False) diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py index 74a5ae8..8bad857 100644 --- a/catalog/pub/msapi/sdc_controller.py +++ b/catalog/pub/msapi/sdc_controller.py @@ -7,7 +7,15 @@ import traceback import uuid from threading import Thread -from apscheduler.scheduler import Scheduler +from apscheduler.schedulers.background import BackgroundScheduler + +from catalog.pub.database.models import NSPackageModel, VnfPackageModel +from catalog.pub.exceptions import CatalogException +from catalog.packages.biz import sdc_vnf_package +from catalog.pub.utils import fileutil +from catalog.packages.biz.ns_descriptor import NsDescriptor +from catalog.pub.utils.jobutil import JobUtil +from catalog.pub.utils.values import ignore_case_get from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient @@ -24,12 +32,12 @@ class SDCController(Thread): def __init__(self): super(SDCController, self).__init__() self.identity = IdentityClient(DMAAP_MR_BASE_URL) - self.scheduler = Scheduler(standalone=True) + self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3}) self.notification_topic = '' self.status_topic = '' self.consumer = '' - @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL) + @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL) def fetch_task(): self.fetch_notification() @@ -190,24 +198,65 @@ def send_notification_status(status_topic, now_ms, distribution_id, artifact, is def process_notification(msg): logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources'])) - service_artifacts = msg['serviceArtifacts'] - for artifact in service_artifacts: - if artifact['artifactType'] == 'TOSCA_CSAR': - csar_id = artifact['artifactUUID'] - download_url = artifact['artifactURL'] - localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ns_csar_base = os.path.join(localhost_dir, "csars", "ns") - local_path = os.path.join(ns_csar_base, msg['distributionID']) - file_name = artifact['artifactName'] - csar_version = artifact['artifactVersion'] - sdc.download_artifacts(download_url, local_path, file_name) - # call ns package upload - data = { - 'nsPackageId': csar_id, - 'nsPackageVersion': csar_version, - 'csarName': file_name, - 'csarDir': local_path - } - jobid = uuid.uuid4() - # NsPackageParser(data, jobid).start() - logger.debug(data, jobid) + try: + ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID']) + # check if the related resources exist + resources = ns.get('resources', None) + job_array = [] + resource_threads = [] + if resources: + for resource in resources: + if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']): + logger.debug("VF [%s] is not distributed.", resource['resourceUUID']) + # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID']) + logger.info("VF [%s] begin to distribute.", resource['resourceUUID']) + csar_id = resource['resourceUUID'] + vim_ids = ignore_case_get(resource, "vimIds") + lab_vim_id = ignore_case_get(resource, "labVimId") + job_id = str(uuid.uuid4()) + job_array.append(job_id) + resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id)) + # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start() + else: + logger.debug("resource [%s] has been distributed", resource['resourceUUID']) + for resource_thread in resource_threads: + resource_thread.start() + for resource_thread in resource_threads: + resource_thread.join() + for jobID in job_array: + job_status = JobUtil.query_job_status(jobID) + if job_status[0].status == 'error': + raise CatalogException("VF resource fail to distributed.") + + service_artifacts = msg['serviceArtifacts'] + for artifact in service_artifacts: + if artifact['artifactType'] == 'TOSCA_CSAR': + csar_id = artifact['artifactUUID'] + if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']): + download_url = artifact['artifactURL'] + localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ns_csar_base = os.path.join(localhost_dir, "csars", "ns") + local_path = os.path.join(ns_csar_base, msg['distributionID']) + file_name = artifact['artifactName'] + # csar_version = artifact['artifactVersion'] + + # download csar package + local_file_name = sdc.download_artifacts(download_url, local_path, file_name) + if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"): + artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path, + "Artifacts/Deployment/OTHER/ns.csar") + if os.path.exists(artifact_vnf_file): + local_file_name = artifact_vnf_file + + data = { + 'userDefinedData': {} + } + nsd = NsDescriptor() + nsd.create(data, csar_id) + nsd.parse_nsd_and_save(csar_id, local_file_name) + logger.debug("CSAR(%s) distriuted successfully.", csar_id) + else: + logger.debug("CSAR(%s) has been distriuted", csar_id) + except CatalogException as e: + logger.error("Failed to download the resource") + logger.error(str(e)) diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 6a6a42e..dfc2137 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -13,27 +13,27 @@ Service. Version: 1.0.7 -------------- -:Release Date: 2020-05-11 +:Release Date: 2020-09-09 **New Features** -- optimize the docker image +- Optimize the docker image +- Remove the mandatory dependency on MSB Released components: - - etsicatalog 1.0.6 + - etsicatalog 1.0.7 -Version: 1.0.7 +Version: 1.0.6 -------------- -:Release Date: 2020-09-09 +:Release Date: 2020-05-11 **New Features** -- Optimize the docker image -- Remove the mandatory dependency on MSB +- optimize the docker image Released components: - - etsicatalog 1.0.7 + - etsicatalog 1.0.6 Version: 1.0.5 -------------- diff --git a/requirements.txt b/requirements.txt index 8404b6f..240f812 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,4 +37,4 @@ onappylog==1.0.9 # uwsgi for parallel processing uwsgi -apscheduler==2.1.2 \ No newline at end of file +apscheduler==3.6.3 \ No newline at end of file