Process notifications for SDC and stores NS and related resources 50/112350/8
authorMars chen <chenzihao@bupt.edu.cn>
Fri, 18 Sep 2020 02:26:56 +0000 (10:26 +0800)
committerdengyh <dengyuanhong@chinamobile.com>
Fri, 18 Sep 2020 07:00:54 +0000 (15:00 +0800)
Issue-ID: MODELING-335
Change-Id: Ie8836c865d21fb4695b85f07f1098d0d4617ac0c
Signed-off-by: Mars chen <chenzihao@bupt.edu.cn>
catalog/pub/Dmaap_lib/dmaap/publisher.py
catalog/pub/msapi/sdc_controller.py
docs/release-notes.rst
requirements.txt

index 7ebbca0..23a2351 100644 (file)
@@ -21,8 +21,7 @@ import time
 from hashlib import sha1
 
 import requests
-from apscheduler.scheduler import Scheduler
-
+from apscheduler.schedulers.background import BackgroundScheduler
 from catalog.pub.Dmaap_lib.pub.exceptions import DmaapClientException
 
 logger = logging.getLogger(__name__)
@@ -40,9 +39,10 @@ class BatchPublisherClient:
         self.pending = []
         self.closed = False
         self.dont_send_until_ms = 0
-        self.scheduler = Scheduler(standalone=False)
+        self.scheduler = BackgroundScheduler()
 
-        @self.scheduler.interval_schedule(second=1)
+        # @self.scheduler.interval_schedule(second=1)
+        @self.scheduler.scheduled_job(second=1)
         def crawl_job():
             self.send_message(False)
 
index 74a5ae8..8bad857 100644 (file)
@@ -7,7 +7,15 @@ import traceback
 import uuid
 from threading import Thread
 
-from apscheduler.scheduler import Scheduler
+from apscheduler.schedulers.background import BackgroundScheduler
+
+from catalog.pub.database.models import NSPackageModel, VnfPackageModel
+from catalog.pub.exceptions import CatalogException
+from catalog.packages.biz import sdc_vnf_package
+from catalog.pub.utils import fileutil
+from catalog.packages.biz.ns_descriptor import NsDescriptor
+from catalog.pub.utils.jobutil import JobUtil
+from catalog.pub.utils.values import ignore_case_get
 
 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
@@ -24,12 +32,12 @@ class SDCController(Thread):
     def __init__(self):
         super(SDCController, self).__init__()
         self.identity = IdentityClient(DMAAP_MR_BASE_URL)
-        self.scheduler = Scheduler(standalone=True)
+        self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
         self.notification_topic = ''
         self.status_topic = ''
         self.consumer = ''
 
-        @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
+        @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
         def fetch_task():
             self.fetch_notification()
 
@@ -190,24 +198,65 @@ def send_notification_status(status_topic, now_ms, distribution_id, artifact, is
 
 def process_notification(msg):
     logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
-    service_artifacts = msg['serviceArtifacts']
-    for artifact in service_artifacts:
-        if artifact['artifactType'] == 'TOSCA_CSAR':
-            csar_id = artifact['artifactUUID']
-            download_url = artifact['artifactURL']
-            localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-            ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
-            local_path = os.path.join(ns_csar_base, msg['distributionID'])
-            file_name = artifact['artifactName']
-            csar_version = artifact['artifactVersion']
-            sdc.download_artifacts(download_url, local_path, file_name)
-            # call ns package upload
-            data = {
-                'nsPackageId': csar_id,
-                'nsPackageVersion': csar_version,
-                'csarName': file_name,
-                'csarDir': local_path
-            }
-            jobid = uuid.uuid4()
-            # NsPackageParser(data, jobid).start()
-            logger.debug(data, jobid)
+    try:
+        ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
+        # check if the related resources exist
+        resources = ns.get('resources', None)
+        job_array = []
+        resource_threads = []
+        if resources:
+            for resource in resources:
+                if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
+                    logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
+                    # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
+                    logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
+                    csar_id = resource['resourceUUID']
+                    vim_ids = ignore_case_get(resource, "vimIds")
+                    lab_vim_id = ignore_case_get(resource, "labVimId")
+                    job_id = str(uuid.uuid4())
+                    job_array.append(job_id)
+                    resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
+                    # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
+                else:
+                    logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
+        for resource_thread in resource_threads:
+            resource_thread.start()
+        for resource_thread in resource_threads:
+            resource_thread.join()
+        for jobID in job_array:
+            job_status = JobUtil.query_job_status(jobID)
+            if job_status[0].status == 'error':
+                raise CatalogException("VF resource fail to distributed.")
+
+        service_artifacts = msg['serviceArtifacts']
+        for artifact in service_artifacts:
+            if artifact['artifactType'] == 'TOSCA_CSAR':
+                csar_id = artifact['artifactUUID']
+                if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']):
+                    download_url = artifact['artifactURL']
+                    localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+                    ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
+                    local_path = os.path.join(ns_csar_base, msg['distributionID'])
+                    file_name = artifact['artifactName']
+                    # csar_version = artifact['artifactVersion']
+
+                    # download csar package
+                    local_file_name = sdc.download_artifacts(download_url, local_path, file_name)
+                    if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"):
+                        artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path,
+                                                                "Artifacts/Deployment/OTHER/ns.csar")
+                        if os.path.exists(artifact_vnf_file):
+                            local_file_name = artifact_vnf_file
+
+                    data = {
+                        'userDefinedData': {}
+                    }
+                    nsd = NsDescriptor()
+                    nsd.create(data, csar_id)
+                    nsd.parse_nsd_and_save(csar_id, local_file_name)
+                    logger.debug("CSAR(%s) distriuted successfully.", csar_id)
+                else:
+                    logger.debug("CSAR(%s) has been distriuted", csar_id)
+    except CatalogException as e:
+        logger.error("Failed to download the resource")
+        logger.error(str(e))
index 6a6a42e..dfc2137 100644 (file)
@@ -13,27 +13,27 @@ Service.
 Version: 1.0.7
 --------------
 
-:Release Date: 2020-05-11
+:Release Date: 2020-09-09
 
 **New Features**
 
-- optimize the docker image
+- Optimize the docker image
+- Remove the mandatory dependency on MSB
 
 Released components:
- - etsicatalog 1.0.6
+ - etsicatalog 1.0.7
 
-Version: 1.0.7
+Version: 1.0.6
 --------------
 
-:Release Date: 2020-09-09
+:Release Date: 2020-05-11
 
 **New Features**
 
-- Optimize the docker image
-- Remove the mandatory dependency on MSB
+- optimize the docker image
 
 Released components:
- - etsicatalog 1.0.7
+ - etsicatalog 1.0.6
 
 Version: 1.0.5
 --------------
index 8404b6f..240f812 100644 (file)
@@ -37,4 +37,4 @@ onappylog==1.0.9
 # uwsgi for parallel processing
 uwsgi
 
-apscheduler==2.1.2
\ No newline at end of file
+apscheduler==3.6.3
\ No newline at end of file