rename package name
[modeling/etsicatalog.git] / catalog / pub / msapi / sdc_controller.py
1 # Copyright 2019 CMCC Technologies Co., Ltd.
2 import json
3 import logging
4 import os
5 import time
6 import traceback
7 import uuid
8 from threading import Thread
9
10 from apscheduler.scheduler import Scheduler
11
12 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
13 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
14 from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
15 from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \
16     DMAAP_MR_PORT
17 from catalog.pub.msapi import sdc
18
19 logger = logging.getLogger(__name__)
20
21 DMAAP_MR_BASE_URL = "http://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT)
22 ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
23
24
25 class SDCController(Thread):
26     def __init__(self):
27         super(SDCController, self).__init__()
28         self.identity = IdentityClient(DMAAP_MR_BASE_URL)
29         self.scheduler = Scheduler(standalone=True)
30         self.notification_topic = ''
31         self.status_topic = ''
32         self.consumer = ''
33
34         @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
35         def fetch_task():
36             self.fetch_notification()
37
38     def run(self):
39         try:
40             description = 'nfvo catalog key for' + CONSUMER_ID
41             key = self.identity.create_apikey('', description)
42             topics = sdc.register_for_topics(key['apiKey'])
43             self.notification_topic = topics['distrNotificationTopicName']
44             self.status_topic = topics['distrStatusTopicName']
45             self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
46             self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
47             self.scheduler.start()
48         except Exception as e:
49             logger.error('start sdc controller failed.')
50             logger.error(e.message)
51             logger.error(traceback.format_exc())
52
53     def fetch_notification(self):
54         try:
55             logger.info('start to fetch message from dmaap.')
56             now_ms = int(time.time() * 1000)
57             notification_msgs = self.consumer.fetch()
58             logger.info('Receive a notification from dmaap: %s', notification_msgs)
59             for notification_msg in notification_msgs:
60                 notification_callback = build_callback_notification(now_ms, notification_msg)
61                 if is_activate_callback(notification_callback):
62                     process_notification(notification_callback)
63         except Exception as e:
64             logger.error('fetch message from dmaap failed.')
65             logger.error(e.message)
66             logger.error(traceback.format_exc())
67
68
69 def is_activate_callback(notification_callback):
70     has_relevant_artifacts_in_resource = False
71     has_relevant_artifacts_in_service = False
72     if notification_callback['resources']:
73         has_relevant_artifacts_in_resource = True
74     if notification_callback['serviceArtifacts']:
75         has_relevant_artifacts_in_service = True
76     return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
77
78
79 def build_callback_notification(now_ms, notification_msg):
80     # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
81     relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
82                                                            notification_msg['serviceArtifacts'])
83     # notification_msg['resources'] = relevant_resource_instances
84     notification_msg['serviceArtifacts'] = relevant_service_artifacts
85     return notification_msg
86
87
88 def build_resource_instances(notification_msg, now_ms):
89     relevant_resource_instances = []
90     resources = notification_msg['resources']
91     for resource in resources:
92         artifacts = resource['artifacts']
93         found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
94         if found_relevant_artifacts:
95             resources['artifacts'] = found_relevant_artifacts
96             relevant_resource_instances.append(resources)
97     return relevant_resource_instances
98
99
100 def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
101     relevant_artifacts = []
102     for artifact in artifacts:
103         artifact_type = artifact['artifactType']
104         is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
105         if is_artifact_relevant:
106             generated_from_uuid = artifact.get('generatedFromUUID', '')
107             if generated_from_uuid:
108                 generated_from_artifact = None
109                 for artifact_g in artifacts:
110                     if generated_from_uuid == artifact_g['artifactUUID']:
111                         generated_from_artifact = artifact_g
112                         break
113                 if generated_from_artifact:
114                     is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
115                 else:
116                     is_artifact_relevant = False
117             if is_artifact_relevant:
118                 artifact = set_related_artifacts(artifact, notification_msg)
119                 relevant_artifacts.append(artifact)
120
121         # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
122         # if notification_status != 'SUCCESS':
123         #     logger.error("Error failed to send notification status to Dmaap.")
124
125     return relevant_artifacts
126
127
128 def set_related_artifacts(artifact, notification_msg):
129     related_artifacts_uuid = artifact.get('relatedArtifacts', '')
130     if related_artifacts_uuid:
131         related_artifacts = []
132         for artifact_uuid in related_artifacts_uuid:
133             related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
134         artifact['relatedArtifactsInfo'] = related_artifacts
135     return artifact
136
137
138 def get_artifact_metadata(notification_msg, uuid):
139     service_artifacts = notification_msg['serviceArtifacts']
140     ret = None
141     for artifact in service_artifacts:
142         if artifact['artifactUUID'] == uuid:
143             ret = artifact
144             break
145     resources = notification_msg['resources']
146     if (not ret) and resources:
147         for resource in resources:
148             artifacts = resource['artifacts']
149             for artifact in artifacts:
150                 if artifact['artifactUUID'] == uuid:
151                     ret = artifact
152                     break
153             if ret:
154                 break
155     return ret
156
157
158 def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
159     logger.info('start to send notification status')
160     status = 'FAIL'
161     if is_artifact_relevant:
162         notification_status = 'NOTIFIED'
163     else:
164         notification_status = 'NOT_NOTIFIED'
165     request = {
166         'distributionID': distribution_id,
167         'consumerID': CONSUMER_ID,
168         'timestamp': now_ms,
169         'artifactURL': artifact['artifactURL'],
170         'status': notification_status
171     }
172     request_json = json.JSONEncoder().encode(request)
173     pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
174     logger.info('try to send notification status: %s', request_json)
175
176     try:
177         pub.send('MyPartitionKey', request_json)
178         time.sleep(1)
179         stuck = pub.close(10)
180         if not stuck:
181             status = 'SUCCESS'
182             logger.info('send notification status success.')
183         else:
184             logger.error('failed to send notification status, %s messages unsent', len(stuck))
185     except Exception as e:
186         logger.error('failed to send notification status.')
187         logger.error(e.message)
188         logger.error(traceback.format_exc())
189
190     return status
191
192
193 def process_notification(msg):
194     logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
195     service_artifacts = msg['serviceArtifacts']
196     for artifact in service_artifacts:
197         if artifact['artifactType'] == 'TOSCA_CSAR':
198             csar_id = artifact['artifactUUID']
199             download_url = artifact['artifactURL']
200             localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
201             ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
202             local_path = os.path.join(ns_csar_base, msg['distributionID'])
203             file_name = artifact['artifactName']
204             csar_version = artifact['artifactVersion']
205             sdc.download_artifacts(download_url, local_path, file_name)
206             # call ns package upload
207             data = {
208                 'nsPackageId': csar_id,
209                 'nsPackageVersion': csar_version,
210                 'csarName': file_name,
211                 'csarDir': local_path
212             }
213             jobid = uuid.uuid4()
214             # NsPackageParser(data, jobid).start()
215             logger.debug(data, jobid)