1 # Copyright 2019 CMCC Technologies Co., Ltd.
7 from threading import Thread
9 from apscheduler.schedulers.background import BackgroundScheduler
11 from catalog.pub.database.models import VnfPackageModel
12 from catalog.pub.exceptions import CatalogException
13 from catalog.packages.biz import sdc_vnf_package, sdc_ns_package
14 from catalog.pub.utils.jobutil import JobUtil
15 from catalog.pub.utils.values import ignore_case_get
17 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
18 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
19 from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
20 from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_BASE_URL
21 from catalog.pub.msapi import sdc
23 logger = logging.getLogger(__name__)
25 ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
28 class SDCController(Thread):
30 super(SDCController, self).__init__()
31 self.identity = IdentityClient(DMAAP_MR_BASE_URL)
32 self.scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
33 self.notification_topic = ''
34 self.status_topic = ''
37 @self.scheduler.scheduled_job('interval', seconds=POLLING_INTERVAL)
39 self.fetch_notification()
43 description = 'nfvo catalog key for' + CONSUMER_ID
44 key = self.identity.create_apikey('', description)
45 topics = sdc.register_for_topics(key['apiKey'])
46 self.notification_topic = topics['distrNotificationTopicName']
47 self.status_topic = topics['distrStatusTopicName']
48 self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
49 self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
50 self.scheduler.start()
51 except Exception as e:
52 logger.error('start sdc controller failed.')
54 logger.error(traceback.format_exc())
56 def fetch_notification(self):
58 logger.info('start to fetch message from dmaap.')
59 now_ms = int(time.time() * 1000)
60 notification_msgs = self.consumer.fetch()
61 logger.info('Receive a notification from dmaap: %s', notification_msgs)
62 for notification_msg in notification_msgs:
63 notification_callback = build_callback_notification(now_ms, notification_msg)
64 if is_activate_callback(notification_callback):
65 process_notification(notification_callback)
66 except Exception as e:
67 logger.error('fetch message from dmaap failed.')
69 logger.error(traceback.format_exc())
72 def is_activate_callback(notification_callback):
73 has_relevant_artifacts_in_resource = False
74 has_relevant_artifacts_in_service = False
75 if notification_callback['resources']:
76 has_relevant_artifacts_in_resource = True
77 if notification_callback['serviceArtifacts']:
78 has_relevant_artifacts_in_service = True
79 return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
82 def build_callback_notification(now_ms, notification_msg):
83 # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
84 relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
85 notification_msg['serviceArtifacts'])
86 # notification_msg['resources'] = relevant_resource_instances
87 notification_msg['serviceArtifacts'] = relevant_service_artifacts
88 return notification_msg
91 def build_resource_instances(notification_msg, now_ms):
92 relevant_resource_instances = []
93 resources = notification_msg['resources']
94 for resource in resources:
95 artifacts = resource['artifacts']
96 found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
97 if found_relevant_artifacts:
98 resources['artifacts'] = found_relevant_artifacts
99 relevant_resource_instances.append(resources)
100 return relevant_resource_instances
103 def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
104 relevant_artifacts = []
105 for artifact in artifacts:
106 artifact_type = artifact['artifactType']
107 is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
108 if is_artifact_relevant:
109 generated_from_uuid = artifact.get('generatedFromUUID', '')
110 if generated_from_uuid:
111 generated_from_artifact = None
112 for artifact_g in artifacts:
113 if generated_from_uuid == artifact_g['artifactUUID']:
114 generated_from_artifact = artifact_g
116 if generated_from_artifact:
117 is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
119 is_artifact_relevant = False
120 if is_artifact_relevant:
121 artifact = set_related_artifacts(artifact, notification_msg)
122 relevant_artifacts.append(artifact)
124 # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
125 # if notification_status != 'SUCCESS':
126 # logger.error("Error failed to send notification status to Dmaap.")
128 return relevant_artifacts
131 def set_related_artifacts(artifact, notification_msg):
132 related_artifacts_uuid = artifact.get('relatedArtifacts', '')
133 if related_artifacts_uuid:
134 related_artifacts = []
135 for artifact_uuid in related_artifacts_uuid:
136 related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
137 artifact['relatedArtifactsInfo'] = related_artifacts
141 def get_artifact_metadata(notification_msg, uuid):
142 service_artifacts = notification_msg['serviceArtifacts']
144 for artifact in service_artifacts:
145 if artifact['artifactUUID'] == uuid:
148 resources = notification_msg['resources']
149 if (not ret) and resources:
150 for resource in resources:
151 artifacts = resource['artifacts']
152 for artifact in artifacts:
153 if artifact['artifactUUID'] == uuid:
161 def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
162 logger.info('start to send notification status')
164 if is_artifact_relevant:
165 notification_status = 'NOTIFIED'
167 notification_status = 'NOT_NOTIFIED'
169 'distributionID': distribution_id,
170 'consumerID': CONSUMER_ID,
172 'artifactURL': artifact['artifactURL'],
173 'status': notification_status
175 request_json = json.JSONEncoder().encode(request)
176 pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
177 logger.info('try to send notification status: %s', request_json)
180 pub.send('MyPartitionKey', request_json)
182 stuck = pub.close(10)
185 logger.info('send notification status success.')
187 logger.error('failed to send notification status, %s messages unsent', len(stuck))
188 except Exception as e:
189 logger.error('failed to send notification status.')
191 logger.error(traceback.format_exc())
196 def process_notification(msg):
197 logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
199 ns = sdc.get_asset(sdc.ASSETTYPE_SERVICES, msg['serviceUUID'])
200 # check if the related resources exist
201 resources = ns.get('resources', None)
203 resource_threads = []
205 for resource in resources:
206 if (resource['resoucreType'] == 'VF') and not VnfPackageModel.objects.filter(vnfPackageId=resource['resourceUUID']):
207 logger.debug("VF [%s] is not distributed.", resource['resourceUUID'])
208 # raise CatalogException("VF (%s) is not distributed." % resource['resourceUUID'])
209 logger.info("VF [%s] begin to distribute.", resource['resourceUUID'])
210 csar_id = resource['resourceUUID']
211 vim_ids = ignore_case_get(resource, "vimIds")
212 lab_vim_id = ignore_case_get(resource, "labVimId")
213 job_id = str(uuid.uuid4())
214 job_array.append(job_id)
215 resource_threads.append(sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id))
216 # sdc_vnf_package.NfDistributeThread(csar_id, vim_ids, lab_vim_id, job_id).start()
218 logger.debug("resource [%s] has been distributed", resource['resourceUUID'])
219 for resource_thread in resource_threads:
220 resource_thread.start()
221 for resource_thread in resource_threads:
222 resource_thread.join()
223 for jobID in job_array:
224 job_status = JobUtil.query_job_status(jobID)
225 if job_status[0].status == 'error':
226 raise CatalogException("VF resource fail to distributed.")
227 csar_id = msg['serviceUUID']
228 sdc_ns_package.ns_on_distribute(csar_id)
229 logger.debug("Csar package [%s] has been destributed successfully", csar_id)
230 except CatalogException as e:
231 logger.error("Failed to download the resource")