1 # Copyright 2019 CMCC Technologies Co., Ltd.
8 from threading import Thread
10 from apscheduler.schedulers.background import BackgroundScheduler
12 from catalog.pub.database.models import NSPackageModel, VnfPackageModel
13 from catalog.pub.exceptions import CatalogException
14 from catalog.packages.biz import sdc_vnf_package
15 from catalog.pub.utils import fileutil
16 from catalog.packages.biz.ns_descriptor import NsDescriptor
17 from catalog.pub.utils.jobutil import JobUtil
18 from catalog.pub.utils.values import ignore_case_get
20 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
21 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
22 from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
23 from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_BASE_URL
24 from catalog.pub.msapi import sdc
26 logger = logging.getLogger(__name__)
28 ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
31 class SDCController(Thread):
33 super(SDCController, self).__init__()
34 self.identity = IdentityClient(DMAAP_MR_BASE_URL)
35 self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
36 self.notification_topic = ''
37 self.status_topic = ''
40 @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
42 self.fetch_notification()
46 description = 'nfvo catalog key for' + CONSUMER_ID
47 key = self.identity.create_apikey('', description)
48 topics = sdc.register_for_topics(key['apiKey'])
49 self.notification_topic = topics['distrNotificationTopicName']
50 self.status_topic = topics['distrStatusTopicName']
51 self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
52 self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
53 self.scheduler.start()
54 except Exception as e:
55 logger.error('start sdc controller failed.')
57 logger.error(traceback.format_exc())
59 def fetch_notification(self):
61 logger.info('start to fetch message from dmaap.')
62 now_ms = int(time.time() * 1000)
63 notification_msgs = self.consumer.fetch()
64 logger.info('Receive a notification from dmaap: %s', notification_msgs)
65 for notification_msg in notification_msgs:
66 notification_callback = build_callback_notification(now_ms, notification_msg)
67 if is_activate_callback(notification_callback):
68 process_notification(notification_callback)
69 except Exception as e:
70 logger.error('fetch message from dmaap failed.')
72 logger.error(traceback.format_exc())
75 def is_activate_callback(notification_callback):
76 has_relevant_artifacts_in_resource = False
77 has_relevant_artifacts_in_service = False
78 if notification_callback['resources']:
79 has_relevant_artifacts_in_resource = True
80 if notification_callback['serviceArtifacts']:
81 has_relevant_artifacts_in_service = True
82 return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
85 def build_callback_notification(now_ms, notification_msg):
86 # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
87 relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
88 notification_msg['serviceArtifacts'])
89 # notification_msg['resources'] = relevant_resource_instances
90 notification_msg['serviceArtifacts'] = relevant_service_artifacts
91 return notification_msg
94 def build_resource_instances(notification_msg, now_ms):
95 relevant_resource_instances = []
96 resources = notification_msg['resources']
97 for resource in resources:
98 artifacts = resource['artifacts']
99 found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
100 if found_relevant_artifacts:
101 resources['artifacts'] = found_relevant_artifacts
102 relevant_resource_instances.append(resources)
103 return relevant_resource_instances
106 def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
107 relevant_artifacts = []
108 for artifact in artifacts:
109 artifact_type = artifact['artifactType']
110 is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
111 if is_artifact_relevant:
112 generated_from_uuid = artifact.get('generatedFromUUID', '')
113 if generated_from_uuid:
114 generated_from_artifact = None
115 for artifact_g in artifacts:
116 if generated_from_uuid == artifact_g['artifactUUID']:
117 generated_from_artifact = artifact_g
119 if generated_from_artifact:
120 is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
122 is_artifact_relevant = False
123 if is_artifact_relevant:
124 artifact = set_related_artifacts(artifact, notification_msg)
125 relevant_artifacts.append(artifact)
127 # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
128 # if notification_status != 'SUCCESS':
129 # logger.error("Error failed to send notification status to Dmaap.")
131 return relevant_artifacts
134 def set_related_artifacts(artifact, notification_msg):
135 related_artifacts_uuid = artifact.get('relatedArtifacts', '')
136 if related_artifacts_uuid:
137 related_artifacts = []
138 for artifact_uuid in related_artifacts_uuid:
139 related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
140 artifact['relatedArtifactsInfo'] = related_artifacts
144 def get_artifact_metadata(notification_msg, uuid):
145 service_artifacts = notification_msg['serviceArtifacts']
147 for artifact in service_artifacts:
148 if artifact['artifactUUID'] == uuid:
151 resources = notification_msg['resources']
152 if (not ret) and resources:
153 for resource in resources:
154 artifacts = resource['artifacts']
155 for artifact in artifacts:
156 if artifact['artifactUUID'] == uuid:
164 def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
165 logger.info('start to send notification status')
167 if is_artifact_relevant:
168 notification_status = 'NOTIFIED'
170 notification_status = 'NOT_NOTIFIED'
172 'distributionID': distribution_id,
173 'consumerID': CONSUMER_ID,
175 'artifactURL': artifact['artifactURL'],
176 'status': notification_status
178 request_json = json.JSONEncoder().encode(request)
179 pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
180 logger.info('try to send notification status: %s', request_json)
183 pub.send('MyPartitionKey', request_json)
185 stuck = pub.close(10)
188 logger.info('send notification status success.')
190 logger.error('failed to send notification status, %s messages unsent', len(stuck))
191 except Exception as e:
192 logger.error('failed to send notification status.')
194 logger.error(traceback.format_exc())
199 def process_notification(msg):
200 logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
202 ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
203 # check if the related resources exist
204 resources = ns.get('resources', None)
206 resource_threads = []
208 for resource in resources:
209 if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
210 logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
211 # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
212 logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
213 csar_id = resource['resourceUUID']
214 vim_ids = ignore_case_get(resource, "vimIds")
215 lab_vim_id = ignore_case_get(resource, "labVimId")
216 job_id = str(uuid.uuid4())
217 job_array.append(job_id)
218 resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
219 # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
221 logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
222 for resource_thread in resource_threads:
223 resource_thread.start()
224 for resource_thread in resource_threads:
225 resource_thread.join()
226 for jobID in job_array:
227 job_status = JobUtil.query_job_status(jobID)
228 if job_status[0].status == 'error':
229 raise CatalogException("VF resource fail to distributed.")
231 service_artifacts = msg['serviceArtifacts']
232 for artifact in service_artifacts:
233 if artifact['artifactType'] == 'TOSCA_CSAR':
234 csar_id = artifact['artifactUUID']
235 if not NSPackageModel.objects.filter(nsPackageId=artifact['artifactUUID']):
236 download_url = artifact['artifactURL']
237 localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
238 ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
239 local_path = os.path.join(ns_csar_base, msg['distributionID'])
240 file_name = artifact['artifactName']
241 # csar_version = artifact['artifactVersion']
243 # download csar package
244 local_file_name = sdc.download_artifacts(download_url, local_path, file_name)
245 if local_file_name.endswith(".csar") or local_file_name.endswith(".zip"):
246 artifact_vnf_file = fileutil.unzip_file(local_file_name, local_path,
247 "Artifacts/Deployment/OTHER/ns.csar")
248 if os.path.exists(artifact_vnf_file):
249 local_file_name = artifact_vnf_file
252 'userDefinedData': {}
255 nsd.create(data, csar_id)
256 nsd.parse_nsd_and_save(csar_id, local_file_name)
257 logger.debug("CSAR(%s) distriuted successfully.", csar_id)
259 logger.debug("CSAR(%s) has been distriuted", csar_id)
260 except CatalogException as e:
261 logger.error("Failed to download the resource")