import uuid
from threading import Thread
-from apscheduler.scheduler import Scheduler
+from apscheduler.schedulers.background import BackgroundScheduler
+
+from catalog.pub.database.models import NSPackageModel, VnfPackageModel
+from catalog.pub.exceptions import CatalogException
+from catalog.packages.biz import sdc_vnf_package
+from catalog.pub.utils import fileutil
+from catalog.packages.biz.ns_descriptor import NsDescriptor
+from catalog.pub.utils.jobutil import JobUtil
+from catalog.pub.utils.values import ignore_case_get
from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
-from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \
- DMAAP_MR_PORT
+from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_BASE_URL
from catalog.pub.msapi import sdc
logger = logging.getLogger(__name__)
-DMAAP_MR_BASE_URL = "https://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT)
ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
def __init__(self):
super(SDCController, self).__init__()
self.identity = IdentityClient(DMAAP_MR_BASE_URL)
- self.scheduler = Scheduler(standalone=True)
+ self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
self.notification_topic = ''
self.status_topic = ''
self.consumer = ''
- @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
+ @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
def fetch_task():
self.fetch_notification()
def process_notification(msg):
logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
- service_artifacts = msg['serviceArtifacts']
- for artifact in service_artifacts:
- if artifact['artifactType'] == 'TOSCA_CSAR':
- csar_id = artifact['artifactUUID']
- download_url = artifact['artifactURL']
- localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
- local_path = os.path.join(ns_csar_base, msg['distributionID'])
- file_name = artifact['artifactName']
- csar_version = artifact['artifactVersion']
- sdc.download_artifacts(download_url, local_path, file_name)
- # call ns package upload
- data = {
- 'nsPackageId': csar_id,
- 'nsPackageVersion': csar_version,
- 'csarName': file_name,
- 'csarDir': local_path
- }
- jobid = uuid.uuid4()
- # NsPackageParser(data, jobid).start()
- logger.debug(data, jobid)
+ try:
+ ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
+ # check if the related resources exist
+ resources = ns.get('resources', None)
+ job_array = []
+ resource_threads = []
+ if resources:
+ for resource in resources:
+ if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
+ logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
+ # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
+ logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
+ csar_id = resource['resourceUUID']
+ vim_ids = ignore_case_get(resource, "vimIds")
+ lab_vim_id = ignore_case_get(resource, "labVimId")
+ job_id = str(uuid.uuid4())
+ job_array.append(job_id)
+ resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
+ # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
+ else:
+ logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
+ for resource_thread in resource_threads:
+ resource_thread.start()
+ for resource_thread in resource_threads:
+ resource_thread.join()
+ for jobID in job_array:
+ job_status = JobUtil.query_job_status(jobID)
+ if job_status[0].status == 'error':
+ raise CatalogException("VF resource fail to distributed.")
+
+ service_artifacts = msg['serviceArtifacts']
+ for artifact in service_artifacts:
+ if artifact['artifactType'] == 'TOSCA_CSAR':
+ csar_id = artifact['artifactUUID']
+ if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']):
+ download_url = artifact['artifactURL']
+ localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
+ local_path = os.path.join(ns_csar_base, msg['distributionID'])
+ file_name = artifact['artifactName']
+ # csar_version = artifact['artifactVersion']
+
+ # download csar package
+ local_file_name = sdc.download_artifacts(download_url, local_path, file_name)
+ if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"):
+ artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path,
+ "Artifacts/Deployment/OTHER/ns.csar")
+ if os.path.exists(artifact_vnf_file):
+ local_file_name = artifact_vnf_file
+
+ data = {
+ 'userDefinedData': {}
+ }
+ nsd = NsDescriptor()
+ nsd.create(data, csar_id)
+ nsd.parse_nsd_and_save(csar_id, local_file_name)
+ logger.debug("CSAR(%s) distriuted successfully.", csar_id)
+ else:
+ logger.debug("CSAR(%s) has been distriuted", csar_id)
+ except CatalogException as e:
+ logger.error("Failed to download the resource")
+ logger.error(str(e))