Process notifications for SDC and stores NS and related resources
[modeling/etsicatalog.git] / catalog / pub / msapi / sdc_controller.py
index 74a5ae8..8bad857 100644 (file)
@@ -7,7 +7,15 @@ import traceback
 import uuid
 from threading import Thread
 
-from apscheduler.scheduler import Scheduler
+from apscheduler.schedulers.background import BackgroundScheduler
+
+from catalog.pub.database.models import NSPackageModel, VnfPackageModel
+from catalog.pub.exceptions import CatalogException
+from catalog.packages.biz import sdc_vnf_package
+from catalog.pub.utils import fileutil
+from catalog.packages.biz.ns_descriptor import NsDescriptor
+from catalog.pub.utils.jobutil import JobUtil
+from catalog.pub.utils.values import ignore_case_get
 
 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
@@ -24,12 +32,12 @@ class SDCController(Thread):
     def __init__(self):
         super(SDCController, self).__init__()
         self.identity = IdentityClient(DMAAP_MR_BASE_URL)
-        self.scheduler = Scheduler(standalone=True)
+        self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
         self.notification_topic = ''
         self.status_topic = ''
         self.consumer = ''
 
-        @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
+        @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
         def fetch_task():
             self.fetch_notification()
 
@@ -190,24 +198,65 @@ def send_notification_status(status_topic, now_ms, distribution_id, artifact, is
 
 def process_notification(msg):
     logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
-    service_artifacts = msg['serviceArtifacts']
-    for artifact in service_artifacts:
-        if artifact['artifactType'] == 'TOSCA_CSAR':
-            csar_id = artifact['artifactUUID']
-            download_url = artifact['artifactURL']
-            localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-            ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
-            local_path = os.path.join(ns_csar_base, msg['distributionID'])
-            file_name = artifact['artifactName']
-            csar_version = artifact['artifactVersion']
-            sdc.download_artifacts(download_url, local_path, file_name)
-            # call ns package upload
-            data = {
-                'nsPackageId': csar_id,
-                'nsPackageVersion': csar_version,
-                'csarName': file_name,
-                'csarDir': local_path
-            }
-            jobid = uuid.uuid4()
-            # NsPackageParser(data, jobid).start()
-            logger.debug(data, jobid)
+    try:
+        ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
+        # check if the related resources exist
+        resources = ns.get('resources', None)
+        job_array = []
+        resource_threads = []
+        if resources:
+            for resource in resources:
+                if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
+                    logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
+                    # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
+                    logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
+                    csar_id = resource['resourceUUID']
+                    vim_ids = ignore_case_get(resource, "vimIds")
+                    lab_vim_id = ignore_case_get(resource, "labVimId")
+                    job_id = str(uuid.uuid4())
+                    job_array.append(job_id)
+                    resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
+                    # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
+                else:
+                    logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
+        for resource_thread in resource_threads:
+            resource_thread.start()
+        for resource_thread in resource_threads:
+            resource_thread.join()
+        for jobID in job_array:
+            job_status = JobUtil.query_job_status(jobID)
+            if job_status[0].status == 'error':
+                raise CatalogException("VF resource fail to distributed.")
+
+        service_artifacts = msg['serviceArtifacts']
+        for artifact in service_artifacts:
+            if artifact['artifactType'] == 'TOSCA_CSAR':
+                csar_id = artifact['artifactUUID']
+                if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']):
+                    download_url = artifact['artifactURL']
+                    localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+                    ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
+                    local_path = os.path.join(ns_csar_base, msg['distributionID'])
+                    file_name = artifact['artifactName']
+                    # csar_version = artifact['artifactVersion']
+
+                    # download csar package
+                    local_file_name = sdc.download_artifacts(download_url, local_path, file_name)
+                    if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"):
+                        artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path,
+                                                                "Artifacts/Deployment/OTHER/ns.csar")
+                        if os.path.exists(artifact_vnf_file):
+                            local_file_name = artifact_vnf_file
+
+                    data = {
+                        'userDefinedData': {}
+                    }
+                    nsd = NsDescriptor()
+                    nsd.create(data, csar_id)
+                    nsd.parse_nsd_and_save(csar_id, local_file_name)
+                    logger.debug("CSAR(%s) distriuted successfully.", csar_id)
+                else:
+                    logger.debug("CSAR(%s) has been distriuted", csar_id)
+    except CatalogException as e:
+        logger.error("Failed to download the resource")
+        logger.error(str(e))