1 # Copyright 2019 CMCC Technologies Co., Ltd.
8 from threading import Thread
10 from apscheduler.scheduler import Scheduler
12 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
13 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
14 from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
15 from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_BASE_URL
16 from catalog.pub.msapi import sdc
18 logger = logging.getLogger(__name__)
20 ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
23 class SDCController(Thread):
25 super(SDCController, self).__init__()
26 self.identity = IdentityClient(DMAAP_MR_BASE_URL)
27 self.scheduler = Scheduler(standalone=True)
28 self.notification_topic = ''
29 self.status_topic = ''
32 @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
34 self.fetch_notification()
38 description = 'nfvo catalog key for' + CONSUMER_ID
39 key = self.identity.create_apikey('', description)
40 topics = sdc.register_for_topics(key['apiKey'])
41 self.notification_topic = topics['distrNotificationTopicName']
42 self.status_topic = topics['distrStatusTopicName']
43 self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
44 self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
45 self.scheduler.start()
46 except Exception as e:
47 logger.error('start sdc controller failed.')
49 logger.error(traceback.format_exc())
51 def fetch_notification(self):
53 logger.info('start to fetch message from dmaap.')
54 now_ms = int(time.time() * 1000)
55 notification_msgs = self.consumer.fetch()
56 logger.info('Receive a notification from dmaap: %s', notification_msgs)
57 for notification_msg in notification_msgs:
58 notification_callback = build_callback_notification(now_ms, notification_msg)
59 if is_activate_callback(notification_callback):
60 process_notification(notification_callback)
61 except Exception as e:
62 logger.error('fetch message from dmaap failed.')
64 logger.error(traceback.format_exc())
67 def is_activate_callback(notification_callback):
68 has_relevant_artifacts_in_resource = False
69 has_relevant_artifacts_in_service = False
70 if notification_callback['resources']:
71 has_relevant_artifacts_in_resource = True
72 if notification_callback['serviceArtifacts']:
73 has_relevant_artifacts_in_service = True
74 return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
77 def build_callback_notification(now_ms, notification_msg):
78 # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
79 relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
80 notification_msg['serviceArtifacts'])
81 # notification_msg['resources'] = relevant_resource_instances
82 notification_msg['serviceArtifacts'] = relevant_service_artifacts
83 return notification_msg
86 def build_resource_instances(notification_msg, now_ms):
87 relevant_resource_instances = []
88 resources = notification_msg['resources']
89 for resource in resources:
90 artifacts = resource['artifacts']
91 found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
92 if found_relevant_artifacts:
93 resources['artifacts'] = found_relevant_artifacts
94 relevant_resource_instances.append(resources)
95 return relevant_resource_instances
98 def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
99 relevant_artifacts = []
100 for artifact in artifacts:
101 artifact_type = artifact['artifactType']
102 is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
103 if is_artifact_relevant:
104 generated_from_uuid = artifact.get('generatedFromUUID', '')
105 if generated_from_uuid:
106 generated_from_artifact = None
107 for artifact_g in artifacts:
108 if generated_from_uuid == artifact_g['artifactUUID']:
109 generated_from_artifact = artifact_g
111 if generated_from_artifact:
112 is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
114 is_artifact_relevant = False
115 if is_artifact_relevant:
116 artifact = set_related_artifacts(artifact, notification_msg)
117 relevant_artifacts.append(artifact)
119 # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
120 # if notification_status != 'SUCCESS':
121 # logger.error("Error failed to send notification status to Dmaap.")
123 return relevant_artifacts
126 def set_related_artifacts(artifact, notification_msg):
127 related_artifacts_uuid = artifact.get('relatedArtifacts', '')
128 if related_artifacts_uuid:
129 related_artifacts = []
130 for artifact_uuid in related_artifacts_uuid:
131 related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
132 artifact['relatedArtifactsInfo'] = related_artifacts
136 def get_artifact_metadata(notification_msg, uuid):
137 service_artifacts = notification_msg['serviceArtifacts']
139 for artifact in service_artifacts:
140 if artifact['artifactUUID'] == uuid:
143 resources = notification_msg['resources']
144 if (not ret) and resources:
145 for resource in resources:
146 artifacts = resource['artifacts']
147 for artifact in artifacts:
148 if artifact['artifactUUID'] == uuid:
156 def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
157 logger.info('start to send notification status')
159 if is_artifact_relevant:
160 notification_status = 'NOTIFIED'
162 notification_status = 'NOT_NOTIFIED'
164 'distributionID': distribution_id,
165 'consumerID': CONSUMER_ID,
167 'artifactURL': artifact['artifactURL'],
168 'status': notification_status
170 request_json = json.JSONEncoder().encode(request)
171 pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
172 logger.info('try to send notification status: %s', request_json)
175 pub.send('MyPartitionKey', request_json)
177 stuck = pub.close(10)
180 logger.info('send notification status success.')
182 logger.error('failed to send notification status, %s messages unsent', len(stuck))
183 except Exception as e:
184 logger.error('failed to send notification status.')
186 logger.error(traceback.format_exc())
191 def process_notification(msg):
192 logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
193 service_artifacts = msg['serviceArtifacts']
194 for artifact in service_artifacts:
195 if artifact['artifactType'] == 'TOSCA_CSAR':
196 csar_id = artifact['artifactUUID']
197 download_url = artifact['artifactURL']
198 localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
199 ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
200 local_path = os.path.join(ns_csar_base, msg['distributionID'])
201 file_name = artifact['artifactName']
202 csar_version = artifact['artifactVersion']
203 sdc.download_artifacts(download_url, local_path, file_name)
204 # call ns package upload
206 'nsPackageId': csar_id,
207 'nsPackageVersion': csar_version,
208 'csarName': file_name,
209 'csarDir': local_path
212 # NsPackageParser(data, jobid).start()
213 logger.debug(data, jobid)