From: dyh Date: Wed, 27 May 2020 02:36:10 +0000 (+0800) Subject: rename package name X-Git-Tag: 1.0.7~14 X-Git-Url: https://gerrit.onap.org/r/gitweb?p=modeling%2Fetsicatalog.git;a=commitdiff_plain;h=fddda96911bdb3ec9841ac71e764cb6eb8fa08d5 rename package name add function for sdc subscribe and notification Issue-ID: MODELING-366 Change-Id: I61dba314a62003577eddc640325f9c5b9263b2bc Signed-off-by: dyh --- diff --git a/catalog/pub/Dmaap-lib/__init__.py b/catalog/pub/Dmaap_lib/__init__.py similarity index 100% rename from catalog/pub/Dmaap-lib/__init__.py rename to catalog/pub/Dmaap_lib/__init__.py diff --git a/catalog/pub/Dmaap-lib/dmaap/__init__.py b/catalog/pub/Dmaap_lib/dmaap/__init__.py similarity index 100% rename from catalog/pub/Dmaap-lib/dmaap/__init__.py rename to catalog/pub/Dmaap_lib/dmaap/__init__.py diff --git a/catalog/pub/Dmaap-lib/dmaap/consumer.py b/catalog/pub/Dmaap_lib/dmaap/consumer.py similarity index 100% rename from catalog/pub/Dmaap-lib/dmaap/consumer.py rename to catalog/pub/Dmaap_lib/dmaap/consumer.py diff --git a/catalog/pub/Dmaap-lib/dmaap/identity.py b/catalog/pub/Dmaap_lib/dmaap/identity.py similarity index 100% rename from catalog/pub/Dmaap-lib/dmaap/identity.py rename to catalog/pub/Dmaap_lib/dmaap/identity.py diff --git a/catalog/pub/Dmaap-lib/dmaap/publisher.py b/catalog/pub/Dmaap_lib/dmaap/publisher.py similarity index 100% rename from catalog/pub/Dmaap-lib/dmaap/publisher.py rename to catalog/pub/Dmaap_lib/dmaap/publisher.py diff --git a/catalog/pub/Dmaap-lib/pub/__init__.py b/catalog/pub/Dmaap_lib/pub/__init__.py similarity index 100% rename from catalog/pub/Dmaap-lib/pub/__init__.py rename to catalog/pub/Dmaap_lib/pub/__init__.py diff --git a/catalog/pub/Dmaap-lib/pub/exceptions.py b/catalog/pub/Dmaap_lib/pub/exceptions.py similarity index 100% rename from catalog/pub/Dmaap-lib/pub/exceptions.py rename to catalog/pub/Dmaap_lib/pub/exceptions.py diff --git a/catalog/pub/Dmaap-lib/test/test_consumer.py b/catalog/pub/Dmaap_lib/test/test_consumer.py similarity index 100% rename from catalog/pub/Dmaap-lib/test/test_consumer.py rename to catalog/pub/Dmaap_lib/test/test_consumer.py diff --git a/catalog/pub/Dmaap-lib/test/test_identity.py b/catalog/pub/Dmaap_lib/test/test_identity.py similarity index 100% rename from catalog/pub/Dmaap-lib/test/test_identity.py rename to catalog/pub/Dmaap_lib/test/test_identity.py diff --git a/catalog/pub/config/config.py b/catalog/pub/config/config.py index dcc6cc1..99932d7 100644 --- a/catalog/pub/config/config.py +++ b/catalog/pub/config/config.py @@ -18,11 +18,6 @@ MSB_SERVICE_IP = '127.0.0.1' MSB_SERVICE_PORT = '80' MSB_BASE_URL = "%s://%s:%s" % (MSB_SERVICE_PROTOCOL, MSB_SERVICE_IP, MSB_SERVICE_PORT) -# [REDIS] -# REDIS_HOST = '127.0.0.1' -# REDIS_PORT = '6379' -# REDIS_PASSWD = '' - # [mysql] DB_IP = "127.0.0.1" DB_PORT = 3306 @@ -97,4 +92,11 @@ SDC_BASE_URL = "https://msb-iag/api" SDC_USER = "modeling" SDC_PASSWD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U" +# [dmaap config] +DMAAP_MR_IP = '127.0.0.1' +DMAAP_MR_PORT = '3904' +CONSUMER_GROUP = "consumerGroup" +CONSUMER_ID = "consumerId" +POLLING_INTERVAL = 15 + VNFD_SCHEMA_VERSION_DEFAULT = "base" diff --git a/catalog/pub/msapi/sdc.py b/catalog/pub/msapi/sdc.py index 86930f0..498db8d 100644 --- a/catalog/pub/msapi/sdc.py +++ b/catalog/pub/msapi/sdc.py @@ -129,3 +129,41 @@ def download_artifacts(download_url, local_path, file_name): local_file.write(ret[1]) local_file.close() return local_file_name + + +def create_consumer(name, salt, password): + req_data = { + 'consumerName': name, + 'consumerSalt': salt, + 'consumerPassword': password + } + req_data = json.JSONEncoder().encode(req_data) + resource = '/sdc2/rest/v1/consumers' + headers = {'USER_ID': 'jh0003'} + ret = restcall.call_req(base_url=SDC_BASE_URL, + user="", + passwd="", + auth_type=restcall.rest_no_auth, + resource=resource, + method="POST", + content=req_data, + additional_headers=headers) + if ret[0] != 0: + logger.error("Status code is %s, detail is %s.", ret[2], ret[1]) + raise CatalogException("Failed to create consumer from sdc.") + + +def register_for_topics(key): + req_data = { + 'apiPublicKey': key, + 'distrEnvName': 'AUTO', + 'isConsumerToSdcDistrStatusTopic': False, + 'distEnvEndPoints': [] + } + req_data = json.JSONEncoder().encode(req_data) + url = '/sdc/v1/registerForDistribution' + ret = call_sdc(url, 'POST', req_data) + if ret[0] != 0: + logger.error("Status code is %s, detail is %s.", ret[2], ret[1]) + raise CatalogException("Failed to register from sdc.") + return json.JSONDecoder().decode(ret[1]) diff --git a/catalog/pub/msapi/sdc_controller.py b/catalog/pub/msapi/sdc_controller.py new file mode 100644 index 0000000..454f3d1 --- /dev/null +++ b/catalog/pub/msapi/sdc_controller.py @@ -0,0 +1,215 @@ +# Copyright 2019 CMCC Technologies Co., Ltd. +import json +import logging +import os +import time +import traceback +import uuid +from threading import Thread + +from apscheduler.scheduler import Scheduler + +from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient +from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient +from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient +from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \ + DMAAP_MR_PORT +from catalog.pub.msapi import sdc + +logger = logging.getLogger(__name__) + +DMAAP_MR_BASE_URL = "http://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT) +ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"] + + +class SDCController(Thread): + def __init__(self): + super(SDCController, self).__init__() + self.identity = IdentityClient(DMAAP_MR_BASE_URL) + self.scheduler = Scheduler(standalone=True) + self.notification_topic = '' + self.status_topic = '' + self.consumer = '' + + @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL) + def fetch_task(): + self.fetch_notification() + + def run(self): + try: + description = 'nfvo catalog key for' + CONSUMER_ID + key = self.identity.create_apikey('', description) + topics = sdc.register_for_topics(key['apiKey']) + self.notification_topic = topics['distrNotificationTopicName'] + self.status_topic = topics['distrStatusTopicName'] + self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID) + self.consumer.set_api_credentials(key['apiKey'], key['apiSecret']) + self.scheduler.start() + except Exception as e: + logger.error('start sdc controller failed.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + def fetch_notification(self): + try: + logger.info('start to fetch message from dmaap.') + now_ms = int(time.time() * 1000) + notification_msgs = self.consumer.fetch() + logger.info('Receive a notification from dmaap: %s', notification_msgs) + for notification_msg in notification_msgs: + notification_callback = build_callback_notification(now_ms, notification_msg) + if is_activate_callback(notification_callback): + process_notification(notification_callback) + except Exception as e: + logger.error('fetch message from dmaap failed.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + +def is_activate_callback(notification_callback): + has_relevant_artifacts_in_resource = False + has_relevant_artifacts_in_service = False + if notification_callback['resources']: + has_relevant_artifacts_in_resource = True + if notification_callback['serviceArtifacts']: + has_relevant_artifacts_in_service = True + return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service + + +def build_callback_notification(now_ms, notification_msg): + # relevant_resource_instances = build_resource_instances(notification_msg, now_ms) + relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms, + notification_msg['serviceArtifacts']) + # notification_msg['resources'] = relevant_resource_instances + notification_msg['serviceArtifacts'] = relevant_service_artifacts + return notification_msg + + +def build_resource_instances(notification_msg, now_ms): + relevant_resource_instances = [] + resources = notification_msg['resources'] + for resource in resources: + artifacts = resource['artifacts'] + found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts) + if found_relevant_artifacts: + resources['artifacts'] = found_relevant_artifacts + relevant_resource_instances.append(resources) + return relevant_resource_instances + + +def handle_relevant_artifacts(notification_msg, now_ms, artifacts): + relevant_artifacts = [] + for artifact in artifacts: + artifact_type = artifact['artifactType'] + is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST + if is_artifact_relevant: + generated_from_uuid = artifact.get('generatedFromUUID', '') + if generated_from_uuid: + generated_from_artifact = None + for artifact_g in artifacts: + if generated_from_uuid == artifact_g['artifactUUID']: + generated_from_artifact = artifact_g + break + if generated_from_artifact: + is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST + else: + is_artifact_relevant = False + if is_artifact_relevant: + artifact = set_related_artifacts(artifact, notification_msg) + relevant_artifacts.append(artifact) + + # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant) + # if notification_status != 'SUCCESS': + # logger.error("Error failed to send notification status to Dmaap.") + + return relevant_artifacts + + +def set_related_artifacts(artifact, notification_msg): + related_artifacts_uuid = artifact.get('relatedArtifacts', '') + if related_artifacts_uuid: + related_artifacts = [] + for artifact_uuid in related_artifacts_uuid: + related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid)) + artifact['relatedArtifactsInfo'] = related_artifacts + return artifact + + +def get_artifact_metadata(notification_msg, uuid): + service_artifacts = notification_msg['serviceArtifacts'] + ret = None + for artifact in service_artifacts: + if artifact['artifactUUID'] == uuid: + ret = artifact + break + resources = notification_msg['resources'] + if (not ret) and resources: + for resource in resources: + artifacts = resource['artifacts'] + for artifact in artifacts: + if artifact['artifactUUID'] == uuid: + ret = artifact + break + if ret: + break + return ret + + +def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant): + logger.info('start to send notification status') + status = 'FAIL' + if is_artifact_relevant: + notification_status = 'NOTIFIED' + else: + notification_status = 'NOT_NOTIFIED' + request = { + 'distributionID': distribution_id, + 'consumerID': CONSUMER_ID, + 'timestamp': now_ms, + 'artifactURL': artifact['artifactURL'], + 'status': notification_status + } + request_json = json.JSONEncoder().encode(request) + pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria') + logger.info('try to send notification status: %s', request_json) + + try: + pub.send('MyPartitionKey', request_json) + time.sleep(1) + stuck = pub.close(10) + if not stuck: + status = 'SUCCESS' + logger.info('send notification status success.') + else: + logger.error('failed to send notification status, %s messages unsent', len(stuck)) + except Exception as e: + logger.error('failed to send notification status.') + logger.error(e.message) + logger.error(traceback.format_exc()) + + return status + + +def process_notification(msg): + logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources'])) + service_artifacts = msg['serviceArtifacts'] + for artifact in service_artifacts: + if artifact['artifactType'] == 'TOSCA_CSAR': + csar_id = artifact['artifactUUID'] + download_url = artifact['artifactURL'] + localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ns_csar_base = os.path.join(localhost_dir, "csars", "ns") + local_path = os.path.join(ns_csar_base, msg['distributionID']) + file_name = artifact['artifactName'] + csar_version = artifact['artifactVersion'] + sdc.download_artifacts(download_url, local_path, file_name) + # call ns package upload + data = { + 'nsPackageId': csar_id, + 'nsPackageVersion': csar_version, + 'csarName': file_name, + 'csarDir': local_path + } + jobid = uuid.uuid4() + # NsPackageParser(data, jobid).start() + logger.debug(data, jobid) diff --git a/requirements.txt b/requirements.txt index 6e8880a..65354d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,4 +35,6 @@ swagger-spec-validator>=2.1.0 onappylog==1.0.9 # uwsgi for parallel processing -# uwsgi \ No newline at end of file +# uwsgi + +apscheduler==2.1.2 \ No newline at end of file