Refactor the code of SDCController
[modeling/etsicatalog.git] / catalog / pub / msapi / sdc_controller.py
1 # Copyright 2019 CMCC Technologies Co., Ltd.
2 import json
3 import logging
4 import time
5 import traceback
6 import uuid
7 from threading import Thread
8
9 from apscheduler.schedulers.background import BackgroundScheduler
10
11 from catalog.pub.database.models import VnfPackageModel
12 from catalog.pub.exceptions import CatalogException
13 from catalog.packages.biz import sdc_vnf_package, sdc_ns_package
14 from catalog.pub.utils.jobutil import JobUtil
15 from catalog.pub.utils.values import ignore_case_get
16
17 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
18 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
19 from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
20 from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_BASE_URL
21 from catalog.pub.msapi import sdc
22
23 logger = logging.getLogger(__name__)
24
25 ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
26
27
28 class SDCController(Thread):
29     def __init__(self):
30         super(SDCController, self).__init__()
31         self.identity = IdentityClient(DMAAP_MR_BASE_URL)
32         self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
33         self.notification_topic = ''
34         self.status_topic = ''
35         self.consumer = ''
36
37         @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
38         def fetch_task():
39             self.fetch_notification()
40
41     def run(self):
42         try:
43             description = 'nfvo catalog key for' + CONSUMER_ID
44             key = self.identity.create_apikey('', description)
45             topics = sdc.register_for_topics(key['apiKey'])
46             self.notification_topic = topics['distrNotificationTopicName']
47             self.status_topic = topics['distrStatusTopicName']
48             self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
49             self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
50             self.scheduler.start()
51         except Exception as e:
52             logger.error('start sdc controller failed.')
53             logger.error(str(e))
54             logger.error(traceback.format_exc())
55
56     def fetch_notification(self):
57         try:
58             logger.info('start to fetch message from dmaap.')
59             now_ms = int(time.time() * 1000)
60             notification_msgs = self.consumer.fetch()
61             logger.info('Receive a notification from dmaap: %s', notification_msgs)
62             for notification_msg in notification_msgs:
63                 notification_callback = build_callback_notification(now_ms, notification_msg)
64                 if is_activate_callback(notification_callback):
65                     process_notification(notification_callback)
66         except Exception as e:
67             logger.error('fetch message from dmaap failed.')
68             logger.error(str(e))
69             logger.error(traceback.format_exc())
70
71
72 def is_activate_callback(notification_callback):
73     has_relevant_artifacts_in_resource = False
74     has_relevant_artifacts_in_service = False
75     if notification_callback['resources']:
76         has_relevant_artifacts_in_resource = True
77     if notification_callback['serviceArtifacts']:
78         has_relevant_artifacts_in_service = True
79     return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
80
81
82 def build_callback_notification(now_ms, notification_msg):
83     # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
84     relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
85                                                            notification_msg['serviceArtifacts'])
86     # notification_msg['resources'] = relevant_resource_instances
87     notification_msg['serviceArtifacts'] = relevant_service_artifacts
88     return notification_msg
89
90
91 def build_resource_instances(notification_msg, now_ms):
92     relevant_resource_instances = []
93     resources = notification_msg['resources']
94     for resource in resources:
95         artifacts = resource['artifacts']
96         found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
97         if found_relevant_artifacts:
98             resources['artifacts'] = found_relevant_artifacts
99             relevant_resource_instances.append(resources)
100     return relevant_resource_instances
101
102
103 def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
104     relevant_artifacts = []
105     for artifact in artifacts:
106         artifact_type = artifact['artifactType']
107         is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
108         if is_artifact_relevant:
109             generated_from_uuid = artifact.get('generatedFromUUID', '')
110             if generated_from_uuid:
111                 generated_from_artifact = None
112                 for artifact_g in artifacts:
113                     if generated_from_uuid == artifact_g['artifactUUID']:
114                         generated_from_artifact = artifact_g
115                         break
116                 if generated_from_artifact:
117                     is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
118                 else:
119                     is_artifact_relevant = False
120             if is_artifact_relevant:
121                 artifact = set_related_artifacts(artifact, notification_msg)
122                 relevant_artifacts.append(artifact)
123
124         # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
125         # if notification_status != 'SUCCESS':
126         #     logger.error("Error failed to send notification status to Dmaap.")
127
128     return relevant_artifacts
129
130
131 def set_related_artifacts(artifact, notification_msg):
132     related_artifacts_uuid = artifact.get('relatedArtifacts', '')
133     if related_artifacts_uuid:
134         related_artifacts = []
135         for artifact_uuid in related_artifacts_uuid:
136             related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
137         artifact['relatedArtifactsInfo'] = related_artifacts
138     return artifact
139
140
141 def get_artifact_metadata(notification_msg, uuid):
142     service_artifacts = notification_msg['serviceArtifacts']
143     ret = None
144     for artifact in service_artifacts:
145         if artifact['artifactUUID'] == uuid:
146             ret = artifact
147             break
148     resources = notification_msg['resources']
149     if (not ret) and resources:
150         for resource in resources:
151             artifacts = resource['artifacts']
152             for artifact in artifacts:
153                 if artifact['artifactUUID'] == uuid:
154                     ret = artifact
155                     break
156             if ret:
157                 break
158     return ret
159
160
161 def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
162     logger.info('start to send notification status')
163     status = 'FAIL'
164     if is_artifact_relevant:
165         notification_status = 'NOTIFIED'
166     else:
167         notification_status = 'NOT_NOTIFIED'
168     request = {
169         'distributionID': distribution_id,
170         'consumerID': CONSUMER_ID,
171         'timestamp': now_ms,
172         'artifactURL': artifact['artifactURL'],
173         'status': notification_status
174     }
175     request_json = json.JSONEncoder().encode(request)
176     pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
177     logger.info('try to send notification status: %s', request_json)
178
179     try:
180         pub.send('MyPartitionKey', request_json)
181         time.sleep(1)
182         stuck = pub.close(10)
183         if not stuck:
184             status = 'SUCCESS'
185             logger.info('send notification status success.')
186         else:
187             logger.error('failed to send notification status, %s messages unsent', len(stuck))
188     except Exception as e:
189         logger.error('failed to send notification status.')
190         logger.error(str(e))
191         logger.error(traceback.format_exc())
192
193     return status
194
195
196 def process_notification(msg):
197     logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
198     try:
199         ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
200         # check if the related resources exist
201         resources = ns.get('resources', None)
202         job_array = []
203         resource_threads = []
204         if resources:
205             for resource in resources:
206                 if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
207                     logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
208                     # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
209                     logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
210                     csar_id = resource['resourceUUID']
211                     vim_ids = ignore_case_get(resource, "vimIds")
212                     lab_vim_id = ignore_case_get(resource, "labVimId")
213                     job_id = str(uuid.uuid4())
214                     job_array.append(job_id)
215                     resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
216                     # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
217                 else:
218                     logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
219         for resource_thread in resource_threads:
220             resource_thread.start()
221         for resource_thread in resource_threads:
222             resource_thread.join()
223         for jobID in job_array:
224             job_status = JobUtil.query_job_status(jobID)
225             if job_status[0].status == 'error':
226                 raise CatalogException("VF resource fail to distributed.")
227         csar_id = msg['serviceUUID']
228         sdc_ns_package.ns_on_distribute(csar_id)
229         logger.debug("Csar package [%s] has been destributed successfully", csar_id)
230     except CatalogException as e:
231         logger.error("Failed to download the resource")
232         logger.error(str(e))