8bad8573b9badb78a520a716434e99f4394610eb
[modeling/etsicatalog.git] / catalog / pub / msapi / sdc_controller.py
1 # Copyright 2019 CMCC Technologies Co., Ltd.
2 import json
3 import logging
4 import os
5 import time
6 import traceback
7 import uuid
8 from threading import Thread
9
10 from apscheduler.schedulers.background import BackgroundScheduler
11
12 from catalog.pub.database.models import NSPackageModel, VnfPackageModel
13 from catalog.pub.exceptions import CatalogException
14 from catalog.packages.biz import sdc_vnf_package
15 from catalog.pub.utils import fileutil
16 from catalog.packages.biz.ns_descriptor import NsDescriptor
17 from catalog.pub.utils.jobutil import JobUtil
18 from catalog.pub.utils.values import ignore_case_get
19
20 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
21 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
22 from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
23 from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_BASE_URL
24 from catalog.pub.msapi import sdc
25
26 logger = logging.getLogger(__name__)
27
28 ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
29
30
31 class SDCController(Thread):
32     def __init__(self):
33         super(SDCController, self).__init__()
34         self.identity = IdentityClient(DMAAP_MR_BASE_URL)
35         self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
36         self.notification_topic = ''
37         self.status_topic = ''
38         self.consumer = ''
39
40         @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
41         def fetch_task():
42             self.fetch_notification()
43
44     def run(self):
45         try:
46             description = 'nfvo catalog key for' + CONSUMER_ID
47             key = self.identity.create_apikey('', description)
48             topics = sdc.register_for_topics(key['apiKey'])
49             self.notification_topic = topics['distrNotificationTopicName']
50             self.status_topic = topics['distrStatusTopicName']
51             self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
52             self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
53             self.scheduler.start()
54         except Exception as e:
55             logger.error('start sdc controller failed.')
56             logger.error(str(e))
57             logger.error(traceback.format_exc())
58
59     def fetch_notification(self):
60         try:
61             logger.info('start to fetch message from dmaap.')
62             now_ms = int(time.time() * 1000)
63             notification_msgs = self.consumer.fetch()
64             logger.info('Receive a notification from dmaap: %s', notification_msgs)
65             for notification_msg in notification_msgs:
66                 notification_callback = build_callback_notification(now_ms, notification_msg)
67                 if is_activate_callback(notification_callback):
68                     process_notification(notification_callback)
69         except Exception as e:
70             logger.error('fetch message from dmaap failed.')
71             logger.error(str(e))
72             logger.error(traceback.format_exc())
73
74
75 def is_activate_callback(notification_callback):
76     has_relevant_artifacts_in_resource = False
77     has_relevant_artifacts_in_service = False
78     if notification_callback['resources']:
79         has_relevant_artifacts_in_resource = True
80     if notification_callback['serviceArtifacts']:
81         has_relevant_artifacts_in_service = True
82     return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
83
84
85 def build_callback_notification(now_ms, notification_msg):
86     # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
87     relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
88                                                            notification_msg['serviceArtifacts'])
89     # notification_msg['resources'] = relevant_resource_instances
90     notification_msg['serviceArtifacts'] = relevant_service_artifacts
91     return notification_msg
92
93
94 def build_resource_instances(notification_msg, now_ms):
95     relevant_resource_instances = []
96     resources = notification_msg['resources']
97     for resource in resources:
98         artifacts = resource['artifacts']
99         found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
100         if found_relevant_artifacts:
101             resources['artifacts'] = found_relevant_artifacts
102             relevant_resource_instances.append(resources)
103     return relevant_resource_instances
104
105
106 def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
107     relevant_artifacts = []
108     for artifact in artifacts:
109         artifact_type = artifact['artifactType']
110         is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
111         if is_artifact_relevant:
112             generated_from_uuid = artifact.get('generatedFromUUID', '')
113             if generated_from_uuid:
114                 generated_from_artifact = None
115                 for artifact_g in artifacts:
116                     if generated_from_uuid == artifact_g['artifactUUID']:
117                         generated_from_artifact = artifact_g
118                         break
119                 if generated_from_artifact:
120                     is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
121                 else:
122                     is_artifact_relevant = False
123             if is_artifact_relevant:
124                 artifact = set_related_artifacts(artifact, notification_msg)
125                 relevant_artifacts.append(artifact)
126
127         # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
128         # if notification_status != 'SUCCESS':
129         #     logger.error("Error failed to send notification status to Dmaap.")
130
131     return relevant_artifacts
132
133
134 def set_related_artifacts(artifact, notification_msg):
135     related_artifacts_uuid = artifact.get('relatedArtifacts', '')
136     if related_artifacts_uuid:
137         related_artifacts = []
138         for artifact_uuid in related_artifacts_uuid:
139             related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
140         artifact['relatedArtifactsInfo'] = related_artifacts
141     return artifact
142
143
144 def get_artifact_metadata(notification_msg, uuid):
145     service_artifacts = notification_msg['serviceArtifacts']
146     ret = None
147     for artifact in service_artifacts:
148         if artifact['artifactUUID'] == uuid:
149             ret = artifact
150             break
151     resources = notification_msg['resources']
152     if (not ret) and resources:
153         for resource in resources:
154             artifacts = resource['artifacts']
155             for artifact in artifacts:
156                 if artifact['artifactUUID'] == uuid:
157                     ret = artifact
158                     break
159             if ret:
160                 break
161     return ret
162
163
164 def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
165     logger.info('start to send notification status')
166     status = 'FAIL'
167     if is_artifact_relevant:
168         notification_status = 'NOTIFIED'
169     else:
170         notification_status = 'NOT_NOTIFIED'
171     request = {
172         'distributionID': distribution_id,
173         'consumerID': CONSUMER_ID,
174         'timestamp': now_ms,
175         'artifactURL': artifact['artifactURL'],
176         'status': notification_status
177     }
178     request_json = json.JSONEncoder().encode(request)
179     pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
180     logger.info('try to send notification status: %s', request_json)
181
182     try:
183         pub.send('MyPartitionKey', request_json)
184         time.sleep(1)
185         stuck = pub.close(10)
186         if not stuck:
187             status = 'SUCCESS'
188             logger.info('send notification status success.')
189         else:
190             logger.error('failed to send notification status, %s messages unsent', len(stuck))
191     except Exception as e:
192         logger.error('failed to send notification status.')
193         logger.error(str(e))
194         logger.error(traceback.format_exc())
195
196     return status
197
198
199 def process_notification(msg):
200     logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
201     try:
202         ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
203         # check if the related resources exist
204         resources = ns.get('resources', None)
205         job_array = []
206         resource_threads = []
207         if resources:
208             for resource in resources:
209                 if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
210                     logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
211                     # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
212                     logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
213                     csar_id = resource['resourceUUID']
214                     vim_ids = ignore_case_get(resource, "vimIds")
215                     lab_vim_id = ignore_case_get(resource, "labVimId")
216                     job_id = str(uuid.uuid4())
217                     job_array.append(job_id)
218                     resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
219                     # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
220                 else:
221                     logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
222         for resource_thread in resource_threads:
223             resource_thread.start()
224         for resource_thread in resource_threads:
225             resource_thread.join()
226         for jobID in job_array:
227             job_status = JobUtil.query_job_status(jobID)
228             if job_status[0].status == 'error':
229                 raise CatalogException("VF resource fail to distributed.")
230
231         service_artifacts = msg['serviceArtifacts']
232         for artifact in service_artifacts:
233             if artifact['artifactType'] == 'TOSCA_CSAR':
234                 csar_id = artifact['artifactUUID']
235                 if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']):
236                     download_url = artifact['artifactURL']
237                     localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
238                     ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
239                     local_path = os.path.join(ns_csar_base, msg['distributionID'])
240                     file_name = artifact['artifactName']
241                     # csar_version = artifact['artifactVersion']
242
243                     # download csar package
244                     local_file_name = sdc.download_artifacts(download_url, local_path, file_name)
245                     if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"):
246                         artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path,
247                                                                 "Artifacts/Deployment/OTHER/ns.csar")
248                         if os.path.exists(artifact_vnf_file):
249                             local_file_name = artifact_vnf_file
250
251                     data = {
252                         'userDefinedData': {}
253                     }
254                     nsd = NsDescriptor()
255                     nsd.create(data, csar_id)
256                     nsd.parse_nsd_and_save(csar_id, local_file_name)
257                     logger.debug("CSAR(%s) distriuted successfully.", csar_id)
258                 else:
259                     logger.debug("CSAR(%s) has been distriuted", csar_id)
260     except CatalogException as e:
261         logger.error("Failed to download the resource")
262         logger.error(str(e))