1 # Copyright 2019 CMCC Technologies Co., Ltd.
8 from threading import Thread
10 from apscheduler.scheduler import Scheduler
12 from catalog.pub.Dmaap_lib.dmaap.consumer import ConsumerClient
13 from catalog.pub.Dmaap_lib.dmaap.identity import IdentityClient
14 from catalog.pub.Dmaap_lib.dmaap.publisher import BatchPublisherClient
15 from catalog.pub.config.config import CONSUMER_GROUP, CONSUMER_ID, POLLING_INTERVAL, DMAAP_MR_IP, \
17 from catalog.pub.msapi import sdc
19 logger = logging.getLogger(__name__)
21 DMAAP_MR_BASE_URL = "https://%s:%s" % (DMAAP_MR_IP, DMAAP_MR_PORT)
22 ARTIFACT_TYPES_LIST = ["TOSCA_TEMPLATE", "TOSCA_CSAR"]
25 class SDCController(Thread):
27 super(SDCController, self).__init__()
28 self.identity = IdentityClient(DMAAP_MR_BASE_URL)
29 self.scheduler = Scheduler(standalone=True)
30 self.notification_topic = ''
31 self.status_topic = ''
34 @self.scheduler.interval_schedule(seconds=POLLING_INTERVAL)
36 self.fetch_notification()
40 description = 'nfvo catalog key for' + CONSUMER_ID
41 key = self.identity.create_apikey('', description)
42 topics = sdc.register_for_topics(key['apiKey'])
43 self.notification_topic = topics['distrNotificationTopicName']
44 self.status_topic = topics['distrStatusTopicName']
45 self.consumer = ConsumerClient(DMAAP_MR_BASE_URL, self.notification_topic, CONSUMER_GROUP, CONSUMER_ID)
46 self.consumer.set_api_credentials(key['apiKey'], key['apiSecret'])
47 self.scheduler.start()
48 except Exception as e:
49 logger.error('start sdc controller failed.')
50 logger.error(e.message)
51 logger.error(traceback.format_exc())
53 def fetch_notification(self):
55 logger.info('start to fetch message from dmaap.')
56 now_ms = int(time.time() * 1000)
57 notification_msgs = self.consumer.fetch()
58 logger.info('Receive a notification from dmaap: %s', notification_msgs)
59 for notification_msg in notification_msgs:
60 notification_callback = build_callback_notification(now_ms, notification_msg)
61 if is_activate_callback(notification_callback):
62 process_notification(notification_callback)
63 except Exception as e:
64 logger.error('fetch message from dmaap failed.')
65 logger.error(e.message)
66 logger.error(traceback.format_exc())
69 def is_activate_callback(notification_callback):
70 has_relevant_artifacts_in_resource = False
71 has_relevant_artifacts_in_service = False
72 if notification_callback['resources']:
73 has_relevant_artifacts_in_resource = True
74 if notification_callback['serviceArtifacts']:
75 has_relevant_artifacts_in_service = True
76 return has_relevant_artifacts_in_resource or has_relevant_artifacts_in_service
79 def build_callback_notification(now_ms, notification_msg):
80 # relevant_resource_instances = build_resource_instances(notification_msg, now_ms)
81 relevant_service_artifacts = handle_relevant_artifacts(notification_msg, now_ms,
82 notification_msg['serviceArtifacts'])
83 # notification_msg['resources'] = relevant_resource_instances
84 notification_msg['serviceArtifacts'] = relevant_service_artifacts
85 return notification_msg
88 def build_resource_instances(notification_msg, now_ms):
89 relevant_resource_instances = []
90 resources = notification_msg['resources']
91 for resource in resources:
92 artifacts = resource['artifacts']
93 found_relevant_artifacts = handle_relevant_artifacts(notification_msg, now_ms, artifacts)
94 if found_relevant_artifacts:
95 resources['artifacts'] = found_relevant_artifacts
96 relevant_resource_instances.append(resources)
97 return relevant_resource_instances
100 def handle_relevant_artifacts(notification_msg, now_ms, artifacts):
101 relevant_artifacts = []
102 for artifact in artifacts:
103 artifact_type = artifact['artifactType']
104 is_artifact_relevant = artifact_type in ARTIFACT_TYPES_LIST
105 if is_artifact_relevant:
106 generated_from_uuid = artifact.get('generatedFromUUID', '')
107 if generated_from_uuid:
108 generated_from_artifact = None
109 for artifact_g in artifacts:
110 if generated_from_uuid == artifact_g['artifactUUID']:
111 generated_from_artifact = artifact_g
113 if generated_from_artifact:
114 is_artifact_relevant = generated_from_artifact['artifactType'] in ARTIFACT_TYPES_LIST
116 is_artifact_relevant = False
117 if is_artifact_relevant:
118 artifact = set_related_artifacts(artifact, notification_msg)
119 relevant_artifacts.append(artifact)
121 # notification_status = send_notification_status(now_ms, notification_msg['distributionID'], artifact, is_artifact_relevant)
122 # if notification_status != 'SUCCESS':
123 # logger.error("Error failed to send notification status to Dmaap.")
125 return relevant_artifacts
128 def set_related_artifacts(artifact, notification_msg):
129 related_artifacts_uuid = artifact.get('relatedArtifacts', '')
130 if related_artifacts_uuid:
131 related_artifacts = []
132 for artifact_uuid in related_artifacts_uuid:
133 related_artifacts.append(get_artifact_metadata(notification_msg, artifact_uuid))
134 artifact['relatedArtifactsInfo'] = related_artifacts
138 def get_artifact_metadata(notification_msg, uuid):
139 service_artifacts = notification_msg['serviceArtifacts']
141 for artifact in service_artifacts:
142 if artifact['artifactUUID'] == uuid:
145 resources = notification_msg['resources']
146 if (not ret) and resources:
147 for resource in resources:
148 artifacts = resource['artifacts']
149 for artifact in artifacts:
150 if artifact['artifactUUID'] == uuid:
158 def send_notification_status(status_topic, now_ms, distribution_id, artifact, is_artifact_relevant):
159 logger.info('start to send notification status')
161 if is_artifact_relevant:
162 notification_status = 'NOTIFIED'
164 notification_status = 'NOT_NOTIFIED'
166 'distributionID': distribution_id,
167 'consumerID': CONSUMER_ID,
169 'artifactURL': artifact['artifactURL'],
170 'status': notification_status
172 request_json = json.JSONEncoder().encode(request)
173 pub = BatchPublisherClient(DMAAP_MR_BASE_URL, status_topic, '', 'application/cambria')
174 logger.info('try to send notification status: %s', request_json)
177 pub.send('MyPartitionKey', request_json)
179 stuck = pub.close(10)
182 logger.info('send notification status success.')
184 logger.error('failed to send notification status, %s messages unsent', len(stuck))
185 except Exception as e:
186 logger.error('failed to send notification status.')
187 logger.error(e.message)
188 logger.error(traceback.format_exc())
193 def process_notification(msg):
194 logger.info('Receive a callback notification, nb of resources: %s', len(msg['resources']))
195 service_artifacts = msg['serviceArtifacts']
196 for artifact in service_artifacts:
197 if artifact['artifactType'] == 'TOSCA_CSAR':
198 csar_id = artifact['artifactUUID']
199 download_url = artifact['artifactURL']
200 localhost_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
201 ns_csar_base = os.path.join(localhost_dir, "csars", "ns")
202 local_path = os.path.join(ns_csar_base, msg['distributionID'])
203 file_name = artifact['artifactName']
204 csar_version = artifact['artifactVersion']
205 sdc.download_artifacts(download_url, local_path, file_name)
206 # call ns package upload
208 'nsPackageId': csar_id,
209 'nsPackageVersion': csar_version,
210 'csarName': file_name,
211 'csarDir': local_path
214 # NsPackageParser(data, jobid).start()
215 logger.debug(data, jobid)