1. Remove the mandatory dependency on MSB
[modeling/etsicatalog.git] / catalog / pub / msapi / sdc_controller.py
1 # Copyright 2019 CMCC Technologies Co., Ltd.
2 import json
3 import logging
4 import os
5 import time
6 import traceback
7 import uuid
8 from threading import Thread
9
10 from apscheduler.scheduler import Scheduler
11
12 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
13 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
14 from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
15 from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_BASE_URL
16 from catalog.pub.msapi import sdc
17
18 logger = logging.getLogger(__name__)
19
20 ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
21
22
23 class SDCController(Thread):
24     def __init__(self):
25         super(SDCController, self).__init__()
26         self.identity = IdentityClient(DMAAP_MR_BASE_URL)
27         self.scheduler = Scheduler(standalone=True)
28         self.notification_topic = ''
29         self.status_topic = ''
30         self.consumer = ''
31
32         @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
33         def fetch_task():
34             self.fetch_notification()
35
36     def run(self):
37         try:
38             description = 'nfvo catalog key for' + CONSUMER_ID
39             key = self.identity.create_apikey('', description)
40             topics = sdc.register_for_topics(key['apiKey'])
41             self.notification_topic = topics['distrNotificationTopicName']
42             self.status_topic = topics['distrStatusTopicName']
43             self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
44             self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
45             self.scheduler.start()
46         except Exception as e:
47             logger.error('start sdc controller failed.')
48             logger.error(str(e))
49             logger.error(traceback.format_exc())
50
51     def fetch_notification(self):
52         try:
53             logger.info('start to fetch message from dmaap.')
54             now_ms = int(time.time() * 1000)
55             notification_msgs = self.consumer.fetch()
56             logger.info('Receive a notification from dmaap: %s', notification_msgs)
57             for notification_msg in notification_msgs:
58                 notification_callback = build_callback_notification(now_ms, notification_msg)
59                 if is_activate_callback(notification_callback):
60                     process_notification(notification_callback)
61         except Exception as e:
62             logger.error('fetch message from dmaap failed.')
63             logger.error(str(e))
64             logger.error(traceback.format_exc())
65
66
67 def is_activate_callback(notification_callback):
68     has_relevant_artifacts_in_resource = False
69     has_relevant_artifacts_in_service = False
70     if notification_callback['resources']:
71         has_relevant_artifacts_in_resource = True
72     if notification_callback['serviceArtifacts']:
73         has_relevant_artifacts_in_service = True
74     return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
75
76
77 def build_callback_notification(now_ms, notification_msg):
78     # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
79     relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
80                                                            notification_msg['serviceArtifacts'])
81     # notification_msg['resources'] = relevant_resource_instances
82     notification_msg['serviceArtifacts'] = relevant_service_artifacts
83     return notification_msg
84
85
86 def build_resource_instances(notification_msg, now_ms):
87     relevant_resource_instances = []
88     resources = notification_msg['resources']
89     for resource in resources:
90         artifacts = resource['artifacts']
91         found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
92         if found_relevant_artifacts:
93             resources['artifacts'] = found_relevant_artifacts
94             relevant_resource_instances.append(resources)
95     return relevant_resource_instances
96
97
98 def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
99     relevant_artifacts = []
100     for artifact in artifacts:
101         artifact_type = artifact['artifactType']
102         is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
103         if is_artifact_relevant:
104             generated_from_uuid = artifact.get('generatedFromUUID', '')
105             if generated_from_uuid:
106                 generated_from_artifact = None
107                 for artifact_g in artifacts:
108                     if generated_from_uuid == artifact_g['artifactUUID']:
109                         generated_from_artifact = artifact_g
110                         break
111                 if generated_from_artifact:
112                     is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
113                 else:
114                     is_artifact_relevant = False
115             if is_artifact_relevant:
116                 artifact = set_related_artifacts(artifact, notification_msg)
117                 relevant_artifacts.append(artifact)
118
119         # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
120         # if notification_status != 'SUCCESS':
121         #     logger.error("Error failed to send notification status to Dmaap.")
122
123     return relevant_artifacts
124
125
126 def set_related_artifacts(artifact, notification_msg):
127     related_artifacts_uuid = artifact.get('relatedArtifacts', '')
128     if related_artifacts_uuid:
129         related_artifacts = []
130         for artifact_uuid in related_artifacts_uuid:
131             related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
132         artifact['relatedArtifactsInfo'] = related_artifacts
133     return artifact
134
135
136 def get_artifact_metadata(notification_msg, uuid):
137     service_artifacts = notification_msg['serviceArtifacts']
138     ret = None
139     for artifact in service_artifacts:
140         if artifact['artifactUUID'] == uuid:
141             ret = artifact
142             break
143     resources = notification_msg['resources']
144     if (not ret) and resources:
145         for resource in resources:
146             artifacts = resource['artifacts']
147             for artifact in artifacts:
148                 if artifact['artifactUUID'] == uuid:
149                     ret = artifact
150                     break
151             if ret:
152                 break
153     return ret
154
155
156 def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
157     logger.info('start to send notification status')
158     status = 'FAIL'
159     if is_artifact_relevant:
160         notification_status = 'NOTIFIED'
161     else:
162         notification_status = 'NOT_NOTIFIED'
163     request = {
164         'distributionID': distribution_id,
165         'consumerID': CONSUMER_ID,
166         'timestamp': now_ms,
167         'artifactURL': artifact['artifactURL'],
168         'status': notification_status
169     }
170     request_json = json.JSONEncoder().encode(request)
171     pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
172     logger.info('try to send notification status: %s', request_json)
173
174     try:
175         pub.send('MyPartitionKey', request_json)
176         time.sleep(1)
177         stuck = pub.close(10)
178         if not stuck:
179             status = 'SUCCESS'
180             logger.info('send notification status success.')
181         else:
182             logger.error('failed to send notification status, %s messages unsent', len(stuck))
183     except Exception as e:
184         logger.error('failed to send notification status.')
185         logger.error(str(e))
186         logger.error(traceback.format_exc())
187
188     return status
189
190
191 def process_notification(msg):
192     logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
193     service_artifacts = msg['serviceArtifacts']
194     for artifact in service_artifacts:
195         if artifact['artifactType'] == 'TOSCA_CSAR':
196             csar_id = artifact['artifactUUID']
197             download_url = artifact['artifactURL']
198             localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
199             ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
200             local_path = os.path.join(ns_csar_base, msg['distributionID'])
201             file_name = artifact['artifactName']
202             csar_version = artifact['artifactVersion']
203             sdc.download_artifacts(download_url, local_path, file_name)
204             # call ns package upload
205             data = {
206                 'nsPackageId': csar_id,
207                 'nsPackageVersion': csar_version,
208                 'csarName': file_name,
209                 'csarDir': local_path
210             }
211             jobid = uuid.uuid4()
212             # NsPackageParser(data, jobid).start()
213             logger.debug(data, jobid)