rename package name 96/108296/3
authordyh <dengyuanhong@chinamobile.com>
Wed, 27 May 2020 02:36:10 +0000 (10:36 +0800)
committerdyh <dengyuanhong@chinamobile.com>
Wed, 27 May 2020 04:01:07 +0000 (12:01 +0800)
add function for sdc subscribe and notification

Issue-ID: MODELING-366
Change-Id: I61dba314a62003577eddc640325f9c5b9263b2bc
Signed-off-by: dyh <dengyuanhong@chinamobile.com>
13 files changed:
catalog/pub/Dmaap_lib/__init__.py [moved from catalog/pub/Dmaap-lib/__init__.py with 100% similarity]
catalog/pub/Dmaap_lib/dmaap/__init__.py [moved from catalog/pub/Dmaap-lib/dmaap/__init__.py with 100% similarity]
catalog/pub/Dmaap_lib/dmaap/consumer.py [moved from catalog/pub/Dmaap-lib/dmaap/consumer.py with 100% similarity]
catalog/pub/Dmaap_lib/dmaap/identity.py [moved from catalog/pub/Dmaap-lib/dmaap/identity.py with 100% similarity]
catalog/pub/Dmaap_lib/dmaap/publisher.py [moved from catalog/pub/Dmaap-lib/dmaap/publisher.py with 100% similarity]
catalog/pub/Dmaap_lib/pub/__init__.py [moved from catalog/pub/Dmaap-lib/pub/__init__.py with 100% similarity]
catalog/pub/Dmaap_lib/pub/exceptions.py [moved from catalog/pub/Dmaap-lib/pub/exceptions.py with 100% similarity]
catalog/pub/Dmaap_lib/test/test_consumer.py [moved from catalog/pub/Dmaap-lib/test/test_consumer.py with 100% similarity]
catalog/pub/Dmaap_lib/test/test_identity.py [moved from catalog/pub/Dmaap-lib/test/test_identity.py with 100% similarity]
catalog/pub/config/config.py
catalog/pub/msapi/sdc.py
catalog/pub/msapi/sdc_controller.py [new file with mode: 0644]
requirements.txt

index dcc6cc1..99932d7 100644 (file)
@@ -18,11 +18,6 @@ MSB_SERVICE_IP = '127.0.0.1'
 MSB_SERVICE_PORT = '80'
 MSB_BASE_URL = "%s://%s:%s" % (MSB_SERVICE_PROTOCOL, MSB_SERVICE_IP, MSB_SERVICE_PORT)
 
-# [REDIS]
-# REDIS_HOST = '127.0.0.1'
-# REDIS_PORT = '6379'
-# REDIS_PASSWD = ''
-
 # [mysql]
 DB_IP = "127.0.0.1"
 DB_PORT = 3306
@@ -97,4 +92,11 @@ SDC_BASE_URL = "https://msb-iag/api"
 SDC_USER = "modeling"
 SDC_PASSWD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U"
 
+# [dmaap config]
+DMAAP_MR_IP = '127.0.0.1'
+DMAAP_MR_PORT = '3904'
+CONSUMER_GROUP = "consumerGroup"
+CONSUMER_ID = "consumerId"
+POLLING_INTERVAL = 15
+
 VNFD_SCHEMA_VERSION_DEFAULT = "base"
index 86930f0..498db8d 100644 (file)
@@ -129,3 +129,41 @@ def download_artifacts(download_url, local_path, file_name):
     local_file.write(ret[1])
     local_file.close()
     return local_file_name
+
+
+def create_consumer(name, salt, password):
+    req_data = {
+        'consumerName': name,
+        'consumerSalt': salt,
+        'consumerPassword': password
+    }
+    req_data = json.JSONEncoder().encode(req_data)
+    resource = '/sdc2/rest/v1/consumers'
+    headers = {'USER_ID': 'jh0003'}
+    ret = restcall.call_req(base_url=SDC_BASE_URL,
+                            user="",
+                            passwd="",
+                            auth_type=restcall.rest_no_auth,
+                            resource=resource,
+                            method="POST",
+                            content=req_data,
+                            additional_headers=headers)
+    if ret[0] != 0:
+        logger.error("Status code is %s, detail is %s.", ret[2], ret[1])
+        raise CatalogException("Failed to create consumer from sdc.")
+
+
+def register_for_topics(key):
+    req_data = {
+        'apiPublicKey': key,
+        'distrEnvName': 'AUTO',
+        'isConsumerToSdcDistrStatusTopic': False,
+        'distEnvEndPoints': []
+    }
+    req_data = json.JSONEncoder().encode(req_data)
+    url = '/sdc/v1/registerForDistribution'
+    ret = call_sdc(url, 'POST', req_data)
+    if ret[0] != 0:
+        logger.error("Status code is %s, detail is %s.", ret[2], ret[1])
+        raise CatalogException("Failed to register from sdc.")
+    return json.JSONDecoder().decode(ret[1])
diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py
new file mode 100644 (file)
index 0000000..454f3d1
--- /dev/null
@@ -0,0 +1,215 @@
+# Copyright 2019 CMCC Technologies Co., Ltd.
+import json
+import logging
+import os
+import time
+import traceback
+import uuid
+from threading import Thread
+
+from apscheduler.scheduler import Scheduler
+
+from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
+from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
+from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
+from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \
+    DMAAP_MR_PORT
+from catalog.pub.msapi import sdc
+
+logger = logging.getLogger(__name__)
+
+DMAAP_MR_BASE_URL = "http://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT)
+ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
+
+
+class SDCController(Thread):
+    def __init__(self):
+        super(SDCController, self).__init__()
+        self.identity = IdentityClient(DMAAP_MR_BASE_URL)
+        self.scheduler = Scheduler(standalone=True)
+        self.notification_topic = ''
+        self.status_topic = ''
+        self.consumer = ''
+
+        @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
+        def fetch_task():
+            self.fetch_notification()
+
+    def run(self):
+        try:
+            description = 'nfvo catalog key for' + CONSUMER_ID
+            key = self.identity.create_apikey('', description)
+            topics = sdc.register_for_topics(key['apiKey'])
+            self.notification_topic = topics['distrNotificationTopicName']
+            self.status_topic = topics['distrStatusTopicName']
+            self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
+            self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
+            self.scheduler.start()
+        except Exception as e:
+            logger.error('start sdc controller failed.')
+            logger.error(e.message)
+            logger.error(traceback.format_exc())
+
+    def fetch_notification(self):
+        try:
+            logger.info('start to fetch message from dmaap.')
+            now_ms = int(time.time() * 1000)
+            notification_msgs = self.consumer.fetch()
+            logger.info('Receive a notification from dmaap: %s', notification_msgs)
+            for notification_msg in notification_msgs:
+                notification_callback = build_callback_notification(now_ms, notification_msg)
+                if is_activate_callback(notification_callback):
+                    process_notification(notification_callback)
+        except Exception as e:
+            logger.error('fetch message from dmaap failed.')
+            logger.error(e.message)
+            logger.error(traceback.format_exc())
+
+
+def is_activate_callback(notification_callback):
+    has_relevant_artifacts_in_resource = False
+    has_relevant_artifacts_in_service = False
+    if notification_callback['resources']:
+        has_relevant_artifacts_in_resource = True
+    if notification_callback['serviceArtifacts']:
+        has_relevant_artifacts_in_service = True
+    return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
+
+
+def build_callback_notification(now_ms, notification_msg):
+    # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
+    relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
+                                                           notification_msg['serviceArtifacts'])
+    # notification_msg['resources'] = relevant_resource_instances
+    notification_msg['serviceArtifacts'] = relevant_service_artifacts
+    return notification_msg
+
+
+def build_resource_instances(notification_msg, now_ms):
+    relevant_resource_instances = []
+    resources = notification_msg['resources']
+    for resource in resources:
+        artifacts = resource['artifacts']
+        found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
+        if found_relevant_artifacts:
+            resources['artifacts'] = found_relevant_artifacts
+            relevant_resource_instances.append(resources)
+    return relevant_resource_instances
+
+
+def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
+    relevant_artifacts = []
+    for artifact in artifacts:
+        artifact_type = artifact['artifactType']
+        is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
+        if is_artifact_relevant:
+            generated_from_uuid = artifact.get('generatedFromUUID', '')
+            if generated_from_uuid:
+                generated_from_artifact = None
+                for artifact_g in artifacts:
+                    if generated_from_uuid == artifact_g['artifactUUID']:
+                        generated_from_artifact = artifact_g
+                        break
+                if generated_from_artifact:
+                    is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
+                else:
+                    is_artifact_relevant = False
+            if is_artifact_relevant:
+                artifact = set_related_artifacts(artifact, notification_msg)
+                relevant_artifacts.append(artifact)
+
+        # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
+        # if notification_status != 'SUCCESS':
+        #     logger.error("Error failed to send notification status to Dmaap.")
+
+    return relevant_artifacts
+
+
+def set_related_artifacts(artifact, notification_msg):
+    related_artifacts_uuid = artifact.get('relatedArtifacts', '')
+    if related_artifacts_uuid:
+        related_artifacts = []
+        for artifact_uuid in related_artifacts_uuid:
+            related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
+        artifact['relatedArtifactsInfo'] = related_artifacts
+    return artifact
+
+
+def get_artifact_metadata(notification_msg, uuid):
+    service_artifacts = notification_msg['serviceArtifacts']
+    ret = None
+    for artifact in service_artifacts:
+        if artifact['artifactUUID'] == uuid:
+            ret = artifact
+            break
+    resources = notification_msg['resources']
+    if (not ret) and resources:
+        for resource in resources:
+            artifacts = resource['artifacts']
+            for artifact in artifacts:
+                if artifact['artifactUUID'] == uuid:
+                    ret = artifact
+                    break
+            if ret:
+                break
+    return ret
+
+
+def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
+    logger.info('start to send notification status')
+    status = 'FAIL'
+    if is_artifact_relevant:
+        notification_status = 'NOTIFIED'
+    else:
+        notification_status = 'NOT_NOTIFIED'
+    request = {
+        'distributionID': distribution_id,
+        'consumerID': CONSUMER_ID,
+        'timestamp': now_ms,
+        'artifactURL': artifact['artifactURL'],
+        'status': notification_status
+    }
+    request_json = json.JSONEncoder().encode(request)
+    pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
+    logger.info('try to send notification status: %s', request_json)
+
+    try:
+        pub.send('MyPartitionKey', request_json)
+        time.sleep(1)
+        stuck = pub.close(10)
+        if not stuck:
+            status = 'SUCCESS'
+            logger.info('send notification status success.')
+        else:
+            logger.error('failed to send notification status, %s messages unsent', len(stuck))
+    except Exception as e:
+        logger.error('failed to send notification status.')
+        logger.error(e.message)
+        logger.error(traceback.format_exc())
+
+    return status
+
+
+def process_notification(msg):
+    logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
+    service_artifacts = msg['serviceArtifacts']
+    for artifact in service_artifacts:
+        if artifact['artifactType'] == 'TOSCA_CSAR':
+            csar_id = artifact['artifactUUID']
+            download_url = artifact['artifactURL']
+            localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+            ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
+            local_path = os.path.join(ns_csar_base, msg['distributionID'])
+            file_name = artifact['artifactName']
+            csar_version = artifact['artifactVersion']
+            sdc.download_artifacts(download_url, local_path, file_name)
+            # call ns package upload
+            data = {
+                'nsPackageId': csar_id,
+                'nsPackageVersion': csar_version,
+                'csarName': file_name,
+                'csarDir': local_path
+            }
+            jobid = uuid.uuid4()
+            # NsPackageParser(data, jobid).start()
+            logger.debug(data, jobid)
index 6e8880a..65354d4 100644 (file)
@@ -35,4 +35,6 @@ swagger-spec-validator>=2.1.0
 onappylog==1.0.9
 
 # uwsgi for parallel processing
-# uwsgi
\ No newline at end of file
+# uwsgi
+
+apscheduler==2.1.2
\ No newline at end of file