1 # ================================================================================
2 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 """OTI Event processor for handling all the event types"""
22 from multiprocessing.dummy import Pool as ThreadPool
23 from threading import Lock
27 from otihandler import utils
28 from otihandler.cfy_client import CfyClient
29 from otihandler.consul_client import ConsulClient
30 from otihandler.dbclient.apis import EventDbAccess
31 from otihandler.dbclient.models import Event, EventAck
32 from otihandler.docker_client import DockerClient
34 notify_response_arr = []
36 K8S_CLUSTER_PROXY_NODE_PORT = '30132'
39 # def notify_docker(args_tuple):
41 # event notification executor inside a process pool to communicate with docker container
42 # interacts with docker client library
44 # (dti_event, db_access, ack_item) = args_tuple
46 # dcae_service_action = dti_event.get('dcae_service_action')
47 # component_scn = ack_item.service_component
48 # deployment_id = ack_item.deployment_id
49 # container_id = ack_item.container_id
50 # docker_host = ack_item.docker_host
51 # reconfig_script = ack_item.reconfig_script
52 # container_type = 'docker'
53 # except Exception as e:
55 # "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e))
58 # what = "{} in {} container {} on {} that was deployed by {}".format(
59 # reconfig_script, container_type, container_id, docker_host, deployment_id)
60 # if dcae_service_action == 'add':
61 # add_action = {"dcae_service_action": "deploy"}
62 # dti_event.update(add_action)
64 # if dcae_service_action == 'delete':
65 # add_action = {"dcae_service_action": "undeploy"}
66 # dti_event.update(add_action)
68 # # dkr = DockerClient(docker_host, reauth=False)
70 # # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
71 # if dti_event.get('dcae_service_action') == 'undeploy':
72 # # delete from dti_event_ack table
74 # db_access.deleteDomainObject(ack_item)
75 # except Exception as e:
76 # msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
77 # DTIProcessor.logger.warning(msg)
78 # return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
80 # return (component_scn, "ran {}, got: {!s}".format(what, result))
82 # except Exception as e:
83 # return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
86 def notify_svc(args_tuple):
88 add/update/delete event handler
89 event notification executor inside a process pool to communicate with docker container and k8s services
90 interacts with docker client library
91 interacts with k8s node port services using REST client
93 (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple
94 dti_event = copy.deepcopy(orig_dti_event)
96 dcae_service_action = dti_event.get('dcae_service_action').lower()
98 component_scn = res_tuple[0]
99 deployment_id = res_tuple[1]
100 container_id = res_tuple[2]
101 node_id = res_tuple[3]
102 docker_host = res_tuple[6]
103 reconfig_script = res_tuple[7]
104 container_type = res_tuple[8]
105 except Exception as e:
106 return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e))
109 if container_type == "docker":
110 # exec reconfigure.sh in docker container
112 what = "{} in {} container {} on {} that was deployed by {} node {}".format(
113 reconfig_script, container_type, container_id, docker_host, deployment_id, node_id)
114 if dcae_service_action == 'add':
115 add_action = {"dcae_service_action": "deploy"}
116 dti_event.update(add_action)
118 if dcae_service_action == 'delete':
119 add_action = {"dcae_service_action": "undeploy"}
120 dti_event.update(add_action)
122 dkr = DockerClient(docker_host, reauth=False)
124 if dti_event.get('dcae_service_action') == 'update':
126 DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what))
127 dti_event.update({"dcae_service_action": "undeploy"})
128 result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
129 DTIProcessor.logger.debug("update 2 - running deploy {}".format(what))
130 dti_event.update({"dcae_service_action": "deploy"})
131 result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
133 upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
135 upd_evt_ack.update_action('update')
136 db_access.saveDomainObject(upd_evt_ack)
137 except Exception as e:
138 msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
139 DTIProcessor.logger.warning(msg)
140 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
142 DTIProcessor.logger.debug("running {}".format(what))
143 result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
144 if dti_event.get('dcae_service_action') == 'deploy':
145 # add into dti_event_ack table
147 add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id,
148 container_type='docker', docker_host=docker_host,
149 container_id=container_id, reconfig_script=reconfig_script,
152 db_access.saveDomainObject(add_evt_ack)
153 except Exception as e:
154 msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
155 DTIProcessor.logger.warning(msg)
156 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
158 # remove from dtih_event_ack if present
161 del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
163 db_access.deleteDomainObject(del_evt_ack)
164 except Exception as e:
165 msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
166 DTIProcessor.logger.warning(msg)
167 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
168 except Exception as e:
169 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
171 return (component_scn, "ran {}, got: {!s}".format(what, result))
172 elif container_type == "k8s":
173 DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component")
174 # if action is 'update', check if k8s pod info exists already for this event in app db
175 if dcae_service_action == 'add':
176 DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action")
177 return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
178 elif dcae_service_action == 'update':
179 # handle update for pods being tracked and handle add for new pods
180 k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
183 DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action")
184 return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
187 DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ")
188 add_action = {"dcae_service_action": "add"}
189 dti_event.update(add_action)
190 return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
193 def notify_k8s(args_tuple):
196 event notification executor inside a process pool to communicate with k8s statefulset nodeport service
197 uses REST API client to call k8s services
199 (dti_event, db_access, curr_evt, res_tuple) = args_tuple
200 component_scn = res_tuple[0]
201 deployment_id = res_tuple[1]
202 node_id = res_tuple[3]
203 container_type = res_tuple[8]
204 service_address = res_tuple[9]
205 service_port = res_tuple[10]
206 what = "{} in {} deployment {} that was deployed by {} node {}".format(
207 "add", container_type, "statefulset", deployment_id, node_id)
208 # call scn node port service REST API
209 svc_nodeport_url = "https://{}:{}".format(service_address, service_port)
211 DTIProcessor.logger.debug("running {}".format(what))
212 response = requests.put(svc_nodeport_url, json=dti_event, timeout=50)
213 response.raise_for_status()
214 except requests.exceptions.RequestException as e:
215 msg = "collector nodeport service({}) threw exception {}: {!s}".format(
216 svc_nodeport_url, type(e).__name__, e)
217 DTIProcessor.logger.error(msg)
218 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
220 event_ack_info = response.json()
221 except Exception as e:
222 msg = "collector nodeport service({}) threw exception {}: {!s}".format(
223 svc_nodeport_url, type(e).__name__, e)
224 DTIProcessor.logger.error(msg)
225 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
227 if not event_ack_info:
228 msg = "collector nodeport service returned bad data"
229 DTIProcessor.logger.error(msg)
230 return (component_scn, "collector nodeport service returned bad data")
232 namespace = event_ack_info.get("KubeNamespace")
233 svc_name = event_ack_info.get("KubeServiceName")
234 svc_port = event_ack_info.get("KubeServicePort")
235 proxy_fqdn = event_ack_info.get("KubeProxyFqdn")
236 cluster_fqdn = event_ack_info.get("KubeClusterFqdn")
237 pod_name = event_ack_info.get("KubePod")
238 statefulset = pod_name[0:pod_name.rindex('-')]
240 what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format(
241 "add", container_type, statefulset, namespace, deployment_id, node_id)
243 add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id,
244 k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn,
245 k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s',
246 service_component=component_scn)
247 db_access.saveDomainObject(add_evt_ack)
248 return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
249 except Exception as e:
250 msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
251 DTIProcessor.logger.warning(msg)
252 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
255 def notify_pods(args_tuple):
258 event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service
259 uses REST API client to call k8s services
262 (dti_event, res_tuple) = args_tuple
264 cluster = res_tuple[0]
265 port = K8S_CLUSTER_PROXY_NODE_PORT
266 namespace = res_tuple[1]
267 svc_name = res_tuple[2]
268 svc_port = res_tuple[4]
269 replicas = res_tuple[3]
271 for replica in range(replicas):
272 pod_id = "sts-{}-{}".format(svc_name, replica)
273 item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace,
276 what = "{} for pod id {} in cluster {} and namespace {}".format("notify", pod_id, cluster, namespace)
278 DTIProcessor.logger.debug("running {}".format(what))
279 response = requests.put(item_pod_url, json=dti_event, timeout=50)
280 response.raise_for_status()
281 except requests.exceptions.RequestException as e:
282 msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
283 item_pod_url, type(e).__name__, e)
284 DTIProcessor.logger.error(msg)
286 notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
289 event_ack_info = response.json()
290 except Exception as e:
291 msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
292 item_pod_url, type(e).__name__, e)
293 DTIProcessor.logger.error(msg)
295 notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
297 if not event_ack_info:
298 msg = "stateful set proxy service returned bad data"
299 DTIProcessor.logger.error(msg)
301 notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what)))
304 notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
305 except Exception as e:
307 notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e)))
309 def notify_k8s_pod(args_tuple):
312 event notification executor inside a process pool to communicate with k8s DTIH proxy service
313 uses REST API client to call k8s services
317 (dti_event, db_access, ack_item) = args_tuple
318 # call ingress proxy to dispatch delete event
320 action = dti_event.get('dcae_service_action')
321 what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format(
322 action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn)
324 DTIProcessor.logger.debug("running {}".format(what))
325 item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(
326 ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace,
327 ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port)
328 component_scn = ack_item.service_component
329 response = requests.put(item_pod_url, json=dti_event, timeout=50)
330 response.raise_for_status()
331 except requests.exceptions.RequestException as e:
332 msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format(
333 item_pod_url, type(e).__name__, e)
334 DTIProcessor.logger.error(msg)
335 return (component_scn, "ran {}, got: {!s}".format(what, msg))
337 if action == 'delete':
339 db_access.deleteDomainObject(ack_item)
340 except Exception as e:
341 msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
342 DTIProcessor.logger.warning(msg)
343 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
346 ack_item.update_action('update')
347 db_access.saveDomainObject(ack_item)
348 except Exception as e:
349 msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
350 DTIProcessor.logger.warning(msg)
351 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
353 return (component_scn, "ran {}, got: {!s}".format(what, response.json()))
356 class DTIProcessor(object):
358 Main event processing class that encapsulates all the logic of this handler application!
359 An instance of this class is created per incoming client request.
361 Generates input data by querying platform services - cloudify, consul, postgresSql
363 It creates a pool of worker processes using a multiprocessing Pool class instance.
364 Tasks are offloaded to the worker processes that exist in the pool.
365 The input data is distributed across processes of the Pool object to enable parallel execution of
366 event notification function across multiple input values (data parallelism).
369 logger = logging.getLogger("oti_handler.dti_processor")
370 K8S_CLUSTER_PROXY_NODE_PORT = '30132'
375 def __init__(self, dti_event, send_notification=True):
377 self.event = dti_event
378 self.is_notify = send_notification
379 self.action = dti_event.get('dcae_service_action').lower()
380 self.target_name = dti_event.get('dcae_target_name')
381 self.target_type = dti_event.get('dcae_target_type', '').lower()
382 self.event_clli = dti_event.get('dcae_service_location')
385 self.docker_pool = ThreadPool(8)
386 self.k8s_pool = ThreadPool(8)
387 except Exception as e:
388 msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e)
389 DTIProcessor.logger.error(msg)
390 self._result['ERROR'] = msg
393 self.db_access = EventDbAccess()
394 self.prim_db_event = None
396 res_dict = self.dispatcher()
401 self.docker_pool.close()
402 self.k8s_pool.close()
403 except Exception as e:
404 msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__,
406 DTIProcessor.logger.error(msg)
407 self._result['ERROR'] = msg
409 self.docker_pool.join()
411 except Exception as e:
412 msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__,
414 DTIProcessor.logger.error(msg)
415 self._result['ERROR'] = msg
417 # if not send_notification:
418 # DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components")
423 utils.update_dict(self._result, res_dict)
424 except Exception as e:
425 msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format(
427 DTIProcessor.logger.error(msg)
428 self._result['ERROR'] = msg
430 DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components")
432 def dispatcher(self):
433 """ dispatch method to execute specific method based on event type """
435 arg = str(self.action)
436 method = getattr(self, arg, lambda: "Invalid action")
441 delete event from consul KV store, this functionality will be retired as events are stored
442 in postgresSql oti database
446 # update Consul KV store with DTI Event - storing them in a folder for all components
447 key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
448 result = ConsulClient.delete_key(key)
449 except Exception as e:
450 msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
451 DTIProcessor.logger.warning(msg)
452 self._result['WARNING'] = msg
455 msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
456 DTIProcessor.logger.warning(msg)
457 self._result['WARNING'] = msg
461 add event to consul KV store, this functionality will be retired as events are stored
462 in postgresSql oti database
464 dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
466 # update Consul KV store with DTI Event - storing them in a folder for all components
467 result = ConsulClient.store_kvs({dep_key: self.event})
468 except Exception as e:
469 msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
470 DTIProcessor.logger.warning(msg)
471 self._result['WARNING'] = msg
475 process DTI event that contains a new VNF instance that has to be configured in the collector microservices
479 msg = "processing add event for {}/{}".format(self.target_type, self.target_name)
480 DTIProcessor.logger.debug(msg)
481 # insert add event into dtih_event table
482 self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
483 location_clli=self.event_clli)
484 self.db_access.saveDomainObject(self.prim_db_event)
485 except Exception as e:
486 msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
487 DTIProcessor.logger.warning(msg)
488 self._result['ERROR'] = msg
493 # force the action to add, to avoid bad things later
494 add_action = {"dcae_service_action": "add"}
495 self.event.update(add_action)
498 "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
499 "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
500 "dcae-d1.idns.cip.corp.com", "30996")
501 mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
502 "docker_node_instance_id1",
503 "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
504 "dcae-d1.idns.cip.corp.com", "30996")
506 # tpl_arr.append(mock_tp11)
507 # tpl_arr.append(mock_tp12)
508 # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
509 res_dict = dict(self.docker_pool.map(notify_svc,
510 ((self.event, self.db_access, self.prim_db_event, tp) for tp in
511 CfyClient().iter_components(self.target_type,
512 dcae_service_location=self.event_clli))
514 except Exception as e:
515 msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__,
517 DTIProcessor.logger.error(msg)
518 self._result['ERROR'] = msg
521 def add_replay(self):
523 convert an update event flow and replay as an add event type since the event acknowledgement is missing
524 from application database
528 # force the action to add, to avoid bad things later
529 add_action = {"dcae_service_action": "add"}
530 self.event.update(add_action)
532 mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
533 "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
534 "dcae-d1.idns.cip.corp.com", "30996")
535 mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
536 "docker_node_instance_id1",
537 "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
538 "dcae-d1.idns.cip.corp.com", "30996")
540 # tpl_arr.append(mock_tp11)
541 # tpl_arr.append(mock_tp12)
542 # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
543 res_dict = dict(self.docker_pool.map(notify_svc,
544 ((self.event, self.db_access, self.prim_db_event, tp) for tp in
545 CfyClient().iter_components(self.target_type,
546 dcae_service_location=self.event_clli))
548 except Exception as e:
549 msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
550 DTIProcessor.logger.error(msg)
551 self._result['ERROR'] = msg
556 process DTI event that indicates a VNF instance has to be removed from the collector microservices
562 self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
565 msg = "processing delete event for {}/{} to relate with any docker hosts".format(
566 self.target_type, self.target_name)
567 DTIProcessor.logger.warning(msg)
568 res_dict_docker = dict(self.docker_pool.map(notify_svc,
569 ((self.event, self.db_access, self.prim_db_event, tp)
571 in CfyClient().iter_components_for_docker(
573 dcae_service_location=self.event_clli))
575 except Exception as e:
576 msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__,
578 DTIProcessor.logger.error(msg)
579 self._result['ERROR'] = msg
582 msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
583 self.target_type, self.target_name)
584 DTIProcessor.logger.warning(msg)
585 if self.prim_db_event:
586 result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
587 res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
588 ((self.event, self.db_access, ack_item) for ack_item in result))))
589 except Exception as e:
590 msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e)
591 DTIProcessor.logger.error(msg)
592 self._result['ERROR'] = msg
595 if self.prim_db_event:
596 self.db_access.deleteDomainObject(self.prim_db_event)
597 except Exception as e:
598 msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
599 DTIProcessor.logger.warning(msg)
600 self._result['ERROR'] = msg
601 except Exception as e:
602 msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
603 DTIProcessor.logger.warning(msg)
604 self._result['ERROR'] = msg
607 utils.update_dict(res_dict, res_dict_k8s)
610 utils.update_dict(res_dict, res_dict_docker)
616 process DTI event that indicates VNF instance has to be updated in the collector microservices
624 self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
625 if self.prim_db_event:
626 self.db_access.update_event_item(self.event, self.target_type, self.target_name)
627 result = self.db_access.query_event_data(self.target_type, self.target_name)
629 msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
630 "replaying this event to cluster if required". \
631 format(self.target_type, self.target_name)
632 DTIProcessor.logger.warning(msg)
633 self._result['WARNING'] = msg
634 res_dict = self.add_replay()
636 msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \
637 "identify new vs update cases".format(self.target_type, self.target_name)
638 DTIProcessor.logger.debug(msg)
640 tpl_arr = CfyClient().iter_components(self.target_type,
641 dcae_service_location=self.event_clli)
642 res_dict_docker = dict(self.docker_pool.map(notify_svc,
644 self.event, self.db_access,
648 except Exception as e:
649 msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format(
651 DTIProcessor.logger.error(msg)
652 self._result['ERROR'] = msg
654 # event is new for the handler
655 msg = "processing update event for {}/{}, but current event info is not found in database, " \
656 "executing add event".format(self.target_type, self.target_name)
657 DTIProcessor.logger.warning(msg)
658 self._result['WARNING'] = msg
659 res_dict = self.add()
660 except Exception as e:
661 msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e)
662 DTIProcessor.logger.error(msg)
663 self._result['ERROR'] = msg
666 utils.update_dict(res_dict, res_dict_k8s)
669 utils.update_dict(res_dict, res_dict_docker)
675 event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event
676 This notification is meant for the cluster failover.
680 self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
681 if self.prim_db_event:
682 self.db_access.update_event_item(self.event, self.target_type, self.target_name)
684 self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
685 location_clli=self.event_clli)
686 self.db_access.saveDomainObject(self.prim_db_event)
687 except Exception as e:
688 msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
689 DTIProcessor.logger.warning(msg)
690 self._result['ERROR'] = msg
693 self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in
694 CfyClient().query_k8_components(self.target_name)))
695 for k, v in notify_response_arr:
697 except Exception as e:
698 msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
699 DTIProcessor.logger.warning(msg)
700 self._result['WARNING'] = msg
704 def get_result(self):
708 def get_k8_raw_events(cls, pod, cluster, namespace):
710 Get DTI events for a k8 stateful set pod container
713 k8s stateful set pod ID that was configured with a specific set of DTI Events
714 :param cluster: required
715 k8s cluster FQDN where the mS was deployed
716 :param namespace: required
717 k8s namespace where the stateful set was deployed in that namespace
719 Dictionary of DTI event(s).
720 DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
722 db_access = EventDbAccess()
723 results = db_access.query_raw_k8_events(cluster, pod, namespace)
725 target_types = set([])
728 for evnt_item in results:
729 target_types.add(evnt_item.target_type)
731 for targ_type in target_types:
732 inner_name_evt_dict = {}
734 if targ_type == evnt.target_type:
735 inner_name_evt_dict[evnt.target_name] = evnt.event
737 outer_dict[targ_type] = inner_name_evt_dict
742 def get_docker_raw_events(cls, service_name, service_location):
744 Get DTI events for docker container.
748 service_name : string
749 required. The service component name assigned by dockerplugin to the component that is unique to the
750 cloudify node instance and used in its Consul key(s).
751 service_location : string
752 optional. allows multiple values separated by commas. Filters DTI events with dcae_service_location
754 If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul
755 TAGs if service_name is provided,
756 otherwise results are not location filtered.
761 Dictionary of DTI event(s).
762 DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
770 want_locs = service_location.split(',')
774 if not want_locs: # default to TAGs of container's dockerhost or k8s cluster master node
776 node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
778 services = ConsulClient.lookup_node(node_name).get("Services")
780 for node_svc in list(services.keys()):
781 if "-component-dockerhost-" in node_svc:
782 want_locs = services[node_svc].get("Tags", [])
788 supported_types = ConsulClient.get_value(service_name + ":oti")
793 supported_types = [t_type.lower() for t_type in list(supported_types.keys())]
794 give_types = supported_types
795 if not give_types or (len(give_types) == 1 and give_types[0] == ''):
798 db_access = EventDbAccess()
799 results = db_access.query_raw_docker_events(give_types, want_locs)
801 target_types = set([])
804 for evnt_item in results:
805 target_types.add(evnt_item.target_type)
807 for targ_type in target_types:
808 inner_name_evt_dict = {}
810 if targ_type == evnt.target_type:
811 inner_name_evt_dict[evnt.target_name] = evnt.event
813 outer_dict[targ_type] = inner_name_evt_dict