rename package name
[modeling/etsicatalog.git] / catalog / pub / msapi / sdc_controller.py
diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py
new file mode 100644 (file)
index 0000000..454f3d1
--- /dev/null
@@ -0,0 +1,215 @@
+# Copyright 2019 CMCC Technologies Co., Ltd.
+import json
+import logging
+import os
+import time
+import traceback
+import uuid
+from threading import Thread
+
+from apscheduler.scheduler import Scheduler
+
+from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
+from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
+from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
+from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \
+    DMAAP_MR_PORT
+from catalog.pub.msapi import sdc
+
+logger = logging.getLogger(__name__)
+
+DMAAP_MR_BASE_URL = "http://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT)
+ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
+
+
+class SDCController(Thread):
+    def __init__(self):
+        super(SDCController, self).__init__()
+        self.identity = IdentityClient(DMAAP_MR_BASE_URL)
+        self.scheduler = Scheduler(standalone=True)
+        self.notification_topic = ''
+        self.status_topic = ''
+        self.consumer = ''
+
+        @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
+        def fetch_task():
+            self.fetch_notification()
+
+    def run(self):
+        try:
+            description = 'nfvo catalog key for' + CONSUMER_ID
+            key = self.identity.create_apikey('', description)
+            topics = sdc.register_for_topics(key['apiKey'])
+            self.notification_topic = topics['distrNotificationTopicName']
+            self.status_topic = topics['distrStatusTopicName']
+            self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
+            self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
+            self.scheduler.start()
+        except Exception as e:
+            logger.error('start sdc controller failed.')
+            logger.error(e.message)
+            logger.error(traceback.format_exc())
+
+    def fetch_notification(self):
+        try:
+            logger.info('start to fetch message from dmaap.')
+            now_ms = int(time.time() * 1000)
+            notification_msgs = self.consumer.fetch()
+            logger.info('Receive a notification from dmaap: %s', notification_msgs)
+            for notification_msg in notification_msgs:
+                notification_callback = build_callback_notification(now_ms, notification_msg)
+                if is_activate_callback(notification_callback):
+                    process_notification(notification_callback)
+        except Exception as e:
+            logger.error('fetch message from dmaap failed.')
+            logger.error(e.message)
+            logger.error(traceback.format_exc())
+
+
+def is_activate_callback(notification_callback):
+    has_relevant_artifacts_in_resource = False
+    has_relevant_artifacts_in_service = False
+    if notification_callback['resources']:
+        has_relevant_artifacts_in_resource = True
+    if notification_callback['serviceArtifacts']:
+        has_relevant_artifacts_in_service = True
+    return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
+
+
+def build_callback_notification(now_ms, notification_msg):
+    # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
+    relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
+                                                           notification_msg['serviceArtifacts'])
+    # notification_msg['resources'] = relevant_resource_instances
+    notification_msg['serviceArtifacts'] = relevant_service_artifacts
+    return notification_msg
+
+
+def build_resource_instances(notification_msg, now_ms):
+    relevant_resource_instances = []
+    resources = notification_msg['resources']
+    for resource in resources:
+        artifacts = resource['artifacts']
+        found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
+        if found_relevant_artifacts:
+            resources['artifacts'] = found_relevant_artifacts
+            relevant_resource_instances.append(resources)
+    return relevant_resource_instances
+
+
+def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
+    relevant_artifacts = []
+    for artifact in artifacts:
+        artifact_type = artifact['artifactType']
+        is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
+        if is_artifact_relevant:
+            generated_from_uuid = artifact.get('generatedFromUUID', '')
+            if generated_from_uuid:
+                generated_from_artifact = None
+                for artifact_g in artifacts:
+                    if generated_from_uuid == artifact_g['artifactUUID']:
+                        generated_from_artifact = artifact_g
+                        break
+                if generated_from_artifact:
+                    is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
+                else:
+                    is_artifact_relevant = False
+            if is_artifact_relevant:
+                artifact = set_related_artifacts(artifact, notification_msg)
+                relevant_artifacts.append(artifact)
+
+        # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
+        # if notification_status != 'SUCCESS':
+        #     logger.error("Error failed to send notification status to Dmaap.")
+
+    return relevant_artifacts
+
+
+def set_related_artifacts(artifact, notification_msg):
+    related_artifacts_uuid = artifact.get('relatedArtifacts', '')
+    if related_artifacts_uuid:
+        related_artifacts = []
+        for artifact_uuid in related_artifacts_uuid:
+            related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
+        artifact['relatedArtifactsInfo'] = related_artifacts
+    return artifact
+
+
+def get_artifact_metadata(notification_msg, uuid):
+    service_artifacts = notification_msg['serviceArtifacts']
+    ret = None
+    for artifact in service_artifacts:
+        if artifact['artifactUUID'] == uuid:
+            ret = artifact
+            break
+    resources = notification_msg['resources']
+    if (not ret) and resources:
+        for resource in resources:
+            artifacts = resource['artifacts']
+            for artifact in artifacts:
+                if artifact['artifactUUID'] == uuid:
+                    ret = artifact
+                    break
+            if ret:
+                break
+    return ret
+
+
+def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
+    logger.info('start to send notification status')
+    status = 'FAIL'
+    if is_artifact_relevant:
+        notification_status = 'NOTIFIED'
+    else:
+        notification_status = 'NOT_NOTIFIED'
+    request = {
+        'distributionID': distribution_id,
+        'consumerID': CONSUMER_ID,
+        'timestamp': now_ms,
+        'artifactURL': artifact['artifactURL'],
+        'status': notification_status
+    }
+    request_json = json.JSONEncoder().encode(request)
+    pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
+    logger.info('try to send notification status: %s', request_json)
+
+    try:
+        pub.send('MyPartitionKey', request_json)
+        time.sleep(1)
+        stuck = pub.close(10)
+        if not stuck:
+            status = 'SUCCESS'
+            logger.info('send notification status success.')
+        else:
+            logger.error('failed to send notification status, %s messages unsent', len(stuck))
+    except Exception as e:
+        logger.error('failed to send notification status.')
+        logger.error(e.message)
+        logger.error(traceback.format_exc())
+
+    return status
+
+
+def process_notification(msg):
+    logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
+    service_artifacts = msg['serviceArtifacts']
+    for artifact in service_artifacts:
+        if artifact['artifactType'] == 'TOSCA_CSAR':
+            csar_id = artifact['artifactUUID']
+            download_url = artifact['artifactURL']
+            localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+            ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
+            local_path = os.path.join(ns_csar_base, msg['distributionID'])
+            file_name = artifact['artifactName']
+            csar_version = artifact['artifactVersion']
+            sdc.download_artifacts(download_url, local_path, file_name)
+            # call ns package upload
+            data = {
+                'nsPackageId': csar_id,
+                'nsPackageVersion': csar_version,
+                'csarName': file_name,
+                'csarDir': local_path
+            }
+            jobid = uuid.uuid4()
+            # NsPackageParser(data, jobid).start()
+            logger.debug(data, jobid)