Merge "Initial delivery of helm charts to deploy mod2 components. Resolved all the...
[dcaegen2/platform.git] / oti / event-handler / otihandler / dti_processor.py
1 # ================================================================================
2 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16
17 """OTI Event processor for handling all the event types"""
18
19 import copy
20 import json
21 import logging
22 from multiprocessing.dummy import Pool as ThreadPool
23 from threading import Lock
24
25 import requests
26
27 from otihandler import utils
28 from otihandler.cfy_client import CfyClient
29 from otihandler.consul_client import ConsulClient
30 from otihandler.dbclient.apis import EventDbAccess
31 from otihandler.dbclient.models import Event, EventAck
32 from otihandler.docker_client import DockerClient
33
34 notify_response_arr = []
35 lock = Lock()
36 K8S_CLUSTER_PROXY_NODE_PORT = '30132'
37
38
39 # def notify_docker(args_tuple):
40 #     """
41 #     event notification executor inside a process pool to communicate with docker container
42 #     interacts with docker client library
43 #     """
44 #     (dti_event, db_access, ack_item) = args_tuple
45 #     try:
46 #         dcae_service_action = dti_event.get('dcae_service_action')
47 #         component_scn = ack_item.service_component
48 #         deployment_id = ack_item.deployment_id
49 #         container_id = ack_item.container_id
50 #         docker_host = ack_item.docker_host
51 #         reconfig_script = ack_item.reconfig_script
52 #         container_type = 'docker'
53 #     except Exception as e:
54 #         return (
55 #             "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e))
56 #     what = ""
57 #     try:
58 #         what = "{} in {} container {} on {} that was deployed by {}".format(
59 #             reconfig_script, container_type, container_id, docker_host, deployment_id)
60 #         if dcae_service_action == 'add':
61 #             add_action = {"dcae_service_action": "deploy"}
62 #             dti_event.update(add_action)
63
64 #         if dcae_service_action == 'delete':
65 #             add_action = {"dcae_service_action": "undeploy"}
66 #             dti_event.update(add_action)
67
68 #         # dkr = DockerClient(docker_host, reauth=False)
69 #         result = ''
70 #         # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
71 #         if dti_event.get('dcae_service_action') == 'undeploy':
72 #             # delete from dti_event_ack table
73 #             try:
74 #                 db_access.deleteDomainObject(ack_item)
75 #             except Exception as e:
76 #                 msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
77 #                 DTIProcessor.logger.warning(msg)
78 #                 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
79 #             else:
80 #                 return (component_scn, "ran {}, got: {!s}".format(what, result))
81
82 #     except Exception as e:
83 #         return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
84
85
86 def notify_svc(args_tuple):
87     """
88     add/update/delete event handler
89     event notification executor inside a process pool to communicate with docker container and k8s services
90     interacts with docker client library
91     interacts with k8s node port services using REST client
92     """
93     (orig_dti_event, db_access, curr_evt, res_tuple) = args_tuple
94     dti_event = copy.deepcopy(orig_dti_event)
95     try:
96         dcae_service_action = dti_event.get('dcae_service_action').lower()
97
98         component_scn = res_tuple[0]
99         deployment_id = res_tuple[1]
100         container_id = res_tuple[2]
101         node_id = res_tuple[3]
102         docker_host = res_tuple[6]
103         reconfig_script = res_tuple[7]
104         container_type = res_tuple[8]
105     except Exception as e:
106         return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e))
107
108     what = ""
109     if container_type == "docker":
110         # exec reconfigure.sh in docker container
111         try:
112             what = "{} in {} container {} on {} that was deployed by {} node {}".format(
113                 reconfig_script, container_type, container_id, docker_host, deployment_id, node_id)
114             if dcae_service_action == 'add':
115                 add_action = {"dcae_service_action": "deploy"}
116                 dti_event.update(add_action)
117
118             if dcae_service_action == 'delete':
119                 add_action = {"dcae_service_action": "undeploy"}
120                 dti_event.update(add_action)
121
122             dkr = DockerClient(docker_host, reauth=False)
123             result = ''
124             if dti_event.get('dcae_service_action') == 'update':
125                 # undeploy + deploy
126                 DTIProcessor.logger.debug("update 1 - running undeploy {}".format(what))
127                 dti_event.update({"dcae_service_action": "undeploy"})
128                 result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
129                 DTIProcessor.logger.debug("update 2 - running deploy {}".format(what))
130                 dti_event.update({"dcae_service_action": "deploy"})
131                 result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
132                 try:
133                     upd_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
134                                                                     container_id)
135                     upd_evt_ack.update_action('update')
136                     db_access.saveDomainObject(upd_evt_ack)
137                 except Exception as e:
138                     msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
139                     DTIProcessor.logger.warning(msg)
140                     return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
141             else:
142                 DTIProcessor.logger.debug("running {}".format(what))
143                 result = dkr.notify_for_reconfiguration(container_id, [reconfig_script, "dti", json.dumps(dti_event)])
144                 if dti_event.get('dcae_service_action') == 'deploy':
145                     # add into dti_event_ack table
146                     try:
147                         add_evt_ack = EventAck(service_component=component_scn, deployment_id=deployment_id,
148                                                container_type='docker', docker_host=docker_host,
149                                                container_id=container_id, reconfig_script=reconfig_script,
150                                                event=curr_evt,
151                                                action='add')
152                         db_access.saveDomainObject(add_evt_ack)
153                     except Exception as e:
154                         msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
155                         DTIProcessor.logger.warning(msg)
156                         return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
157                 else:
158                     # remove from dtih_event_ack if present
159                     if curr_evt: 
160                         try:
161                             del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
162                                                                             container_id)
163                             db_access.deleteDomainObject(del_evt_ack)
164                         except Exception as e:
165                             msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
166                             DTIProcessor.logger.warning(msg)
167                             return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
168         except Exception as e:
169             return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
170
171         return (component_scn, "ran {}, got: {!s}".format(what, result))
172     elif container_type == "k8s":
173         DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component")
174         # if action is 'update', check if k8s pod info exists already for this event in app db
175         if dcae_service_action == 'add':
176             DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action")
177             return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
178         elif dcae_service_action == 'update':
179             # handle update for pods being tracked and handle add for new pods
180             k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
181             if k8s_scn_result: 
182                 # update
183                 DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action")
184                 return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
185             else:
186                 # add
187                 DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ")
188                 add_action = {"dcae_service_action": "add"}
189                 dti_event.update(add_action)
190                 return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
191
192
193 def notify_k8s(args_tuple):
194     """
195     add event handler
196     event notification executor inside a process pool to communicate with k8s statefulset nodeport service
197     uses REST API client to call k8s services
198     """
199     (dti_event, db_access, curr_evt, res_tuple) = args_tuple
200     component_scn = res_tuple[0]
201     deployment_id = res_tuple[1]
202     node_id = res_tuple[3]
203     container_type = res_tuple[8]
204     service_address = res_tuple[9]
205     service_port = res_tuple[10]
206     what = "{} in {} deployment {} that was deployed by {} node {}".format(
207         "add", container_type, "statefulset", deployment_id, node_id)
208     # call scn node port service REST API
209     svc_nodeport_url = "https://{}:{}".format(service_address, service_port)
210     try:
211         DTIProcessor.logger.debug("running {}".format(what))
212         response = requests.put(svc_nodeport_url, json=dti_event, timeout=50)
213         response.raise_for_status()
214     except requests.exceptions.RequestException as e:
215         msg = "collector nodeport service({}) threw exception {}: {!s}".format(
216             svc_nodeport_url, type(e).__name__, e)
217         DTIProcessor.logger.error(msg)
218         return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
219     try:
220         event_ack_info = response.json()
221     except Exception as e:
222         msg = "collector nodeport service({}) threw exception {}: {!s}".format(
223             svc_nodeport_url, type(e).__name__, e)
224         DTIProcessor.logger.error(msg)
225         return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
226
227     if not event_ack_info:
228         msg = "collector nodeport service returned bad data"
229         DTIProcessor.logger.error(msg)
230         return (component_scn, "collector nodeport service returned bad data")
231
232     namespace = event_ack_info.get("KubeNamespace")
233     svc_name = event_ack_info.get("KubeServiceName")
234     svc_port = event_ack_info.get("KubeServicePort")
235     proxy_fqdn = event_ack_info.get("KubeProxyFqdn")
236     cluster_fqdn = event_ack_info.get("KubeClusterFqdn")
237     pod_name = event_ack_info.get("KubePod")
238     statefulset = pod_name[0:pod_name.rindex('-')]
239
240     what = "{} in {} deployment {} in namespace {} that was deployed by {} node {}".format(
241         "add", container_type, statefulset, namespace, deployment_id, node_id)
242     try:
243         add_evt_ack = EventAck(k8s_namespace=namespace, k8s_service_name=svc_name, deployment_id=deployment_id,
244                                k8s_service_port=svc_port, k8s_cluster_fqdn=cluster_fqdn, k8s_proxy_fqdn=proxy_fqdn,
245                                k8s_pod_id=pod_name, event=curr_evt, action='add', container_type='k8s',
246                                service_component=component_scn)
247         db_access.saveDomainObject(add_evt_ack)
248         return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
249     except Exception as e:
250         msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
251         DTIProcessor.logger.warning(msg)
252         return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
253
254
255 def notify_pods(args_tuple):
256     """
257     notify event handler
258     event notification executor inside a process pool to communicate with k8s DTIH proxy nodeport service
259     uses REST API client to call k8s services
260     """
261     event_ack_info = ''
262     (dti_event, res_tuple) = args_tuple
263     try:
264         cluster = res_tuple[0]
265         port = K8S_CLUSTER_PROXY_NODE_PORT
266         namespace = res_tuple[1]
267         svc_name = res_tuple[2]
268         svc_port = res_tuple[4]
269         replicas = res_tuple[3]
270
271         for replica in range(replicas):
272             pod_id = "sts-{}-{}".format(svc_name, replica)
273             item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(cluster, port, namespace,
274                                                                                     pod_id, svc_name,
275                                                                                     svc_port)
276             what = "{} for pod id {} in cluster {} and  namespace {}".format("notify", pod_id, cluster, namespace)
277             try:
278                 DTIProcessor.logger.debug("running {}".format(what))
279                 response = requests.put(item_pod_url, json=dti_event, timeout=50)
280                 response.raise_for_status()
281             except requests.exceptions.RequestException as e:
282                 msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
283                     item_pod_url, type(e).__name__, e)
284                 DTIProcessor.logger.error(msg)
285                 with lock:
286                     notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
287             else:
288                 try:
289                     event_ack_info = response.json()
290                 except Exception as e:
291                     msg = "stateful set proxy service({}) threw exception {}: {!s}".format(
292                         item_pod_url, type(e).__name__, e)
293                     DTIProcessor.logger.error(msg)
294                     with lock:
295                         notify_response_arr.append((pod_id, "exception {}: {!s} running {}".format(type(e).__name__, e, what)))
296
297                 if not event_ack_info:
298                     msg = "stateful set proxy service returned bad data"
299                     DTIProcessor.logger.error(msg)
300                     with lock:
301                         notify_response_arr.append ((pod_id, "no acknowledgement - running {}".format(what)))
302
303                 with lock:
304                     notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
305     except Exception as e:
306         with lock:
307             notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e)))
308
309 def notify_k8s_pod(args_tuple):
310     """
311     update event handler
312     event notification executor inside a process pool to communicate with k8s DTIH proxy service
313     uses REST API client to call k8s services
314     """
315     item_pod_url = ''
316     component_scn = ''
317     (dti_event, db_access, ack_item) = args_tuple
318     # call ingress proxy to dispatch delete event
319
320     action = dti_event.get('dcae_service_action')
321     what = "{} to {} ID {} in namespace {} that was deployed in cluster {}".format(
322         action, 'k8s pod', ack_item.k8s_pod_id, ack_item.k8s_namespace, ack_item.k8s_cluster_fqdn)
323     try:
324         DTIProcessor.logger.debug("running {}".format(what))
325         item_pod_url = "https://{}:{}/{}/{}?service_name={}&service_port={}".format(
326             ack_item.k8s_proxy_fqdn, K8S_CLUSTER_PROXY_NODE_PORT, ack_item.k8s_namespace,
327             ack_item.k8s_pod_id, ack_item.k8s_service_name, ack_item.k8s_service_port)
328         component_scn = ack_item.service_component
329         response = requests.put(item_pod_url, json=dti_event, timeout=50)
330         response.raise_for_status()
331     except requests.exceptions.RequestException as e:
332         msg = "exception occured, stateful set proxy service({}) threw {}: {!s}".format(
333             item_pod_url, type(e).__name__, e)
334         DTIProcessor.logger.error(msg)
335         return (component_scn, "ran {}, got: {!s}".format(what, msg))
336     else:
337         if action == 'delete':
338             try:
339                 db_access.deleteDomainObject(ack_item)
340             except Exception as e:
341                 msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
342                 DTIProcessor.logger.warning(msg)
343                 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
344         else:
345             try:
346                 ack_item.update_action('update')
347                 db_access.saveDomainObject(ack_item)
348             except Exception as e:
349                 msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
350                 DTIProcessor.logger.warning(msg)
351                 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
352
353     return (component_scn, "ran {}, got: {!s}".format(what, response.json()))
354
355
356 class DTIProcessor(object):
357     """
358     Main event processing class that encapsulates all the logic of this handler application!
359     An instance of this class is created per incoming client request.
360
361     Generates input data by querying platform services - cloudify, consul, postgresSql
362
363     It creates a pool of worker processes using a multiprocessing Pool class instance.
364     Tasks are offloaded to the worker processes that exist in the pool.
365     The input data is distributed across processes of the Pool object to enable parallel execution of
366     event notification function across multiple input values (data parallelism).
367     """
368
369     logger = logging.getLogger("oti_handler.dti_processor")
370     K8S_CLUSTER_PROXY_NODE_PORT = '30132'
371     db_access = None
372     docker_pool = None
373     k8s_pool = None
374
375     def __init__(self, dti_event, send_notification=True):
376         self._result = {}
377         self.event = dti_event
378         self.is_notify = send_notification
379         self.action = dti_event.get('dcae_service_action').lower()
380         self.target_name = dti_event.get('dcae_target_name')
381         self.target_type = dti_event.get('dcae_target_type', '').lower()
382         self.event_clli = dti_event.get('dcae_service_location')
383         res_dict = None
384         try:
385             self.docker_pool = ThreadPool(8)
386             self.k8s_pool = ThreadPool(8)
387         except Exception as e:
388             msg = "DTIProcessor.__init__() creating ThreadPool got exception {}: {!s}".format(type(e).__name__, e)
389             DTIProcessor.logger.error(msg)
390             self._result['ERROR'] = msg
391             raise e
392         else:
393             self.db_access = EventDbAccess()
394             self.prim_db_event = None
395             try:
396                 res_dict = self.dispatcher()
397             except:
398                 raise
399             finally:
400                 try:
401                     self.docker_pool.close()
402                     self.k8s_pool.close()
403                 except Exception as e:
404                     msg = "DTIProcessor.__init__() running pool.close() got exception {}: {!s}".format(type(e).__name__,
405                                                                                                        e)
406                     DTIProcessor.logger.error(msg)
407                     self._result['ERROR'] = msg
408                 try:
409                     self.docker_pool.join()
410                     self.k8s_pool.join()
411                 except Exception as e:
412                     msg = "DTIProcessor.__init__() running pool.join() got exception {}: {!s}".format(type(e).__name__,
413                                                                                                       e)
414                     DTIProcessor.logger.error(msg)
415                     self._result['ERROR'] = msg
416
417             # if not send_notification:
418             #     DTIProcessor._logger.debug("DTIProcessor.__init__() not notifying DCAE-Controller components")
419             #     return
420
421             if res_dict:
422                 try:
423                     utils.update_dict(self._result, res_dict)
424                 except Exception as e:
425                     msg = "DTIProcessor.__init__() running utils.update_dict() got exception {}: {!s}".format(
426                         type(e).__name__, e)
427                     DTIProcessor.logger.error(msg)
428                     self._result['ERROR'] = msg
429
430         DTIProcessor.logger.debug("DTIProcessor.__init__() done notifying new DCAE-Controller components")
431
432     def dispatcher(self):
433         """ dispatch method to execute specific method based on event type """
434
435         arg = str(self.action)
436         method = getattr(self, arg, lambda: "Invalid action")
437         return method()
438
439     def undeploy(self):
440         """
441         delete event from consul KV store, this functionality will be retired as events are stored
442         in postgresSql oti database
443         """
444         global key
445         try:
446             # update Consul KV store with DTI Event - storing them in a folder for all components
447             key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
448             result = ConsulClient.delete_key(key)
449         except Exception as e:
450             msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
451             DTIProcessor.logger.warning(msg)
452             self._result['WARNING'] = msg
453         else:
454             if not result:
455                 msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
456                 DTIProcessor.logger.warning(msg)
457                 self._result['WARNING'] = msg
458
459     def deploy(self):
460         """
461         add event to consul KV store, this functionality will be retired as events are stored
462         in postgresSql oti database
463         """
464         dep_key = "{}/{}/{}".format("dti_events", self.target_type, self.target_name)
465         try:
466             # update Consul KV store with DTI Event - storing them in a folder for all components
467             result = ConsulClient.store_kvs({dep_key: self.event})
468         except Exception as e:
469             msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
470             DTIProcessor.logger.warning(msg)
471             self._result['WARNING'] = msg
472
473     def add(self):
474         """
475         process DTI event that contains a new VNF instance that has to be configured in the collector microservices
476         """
477         res_dict = None
478         try:
479             msg = "processing add event for {}/{}".format(self.target_type, self.target_name)
480             DTIProcessor.logger.debug(msg)
481             # insert add event into dtih_event table
482             self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
483                                        location_clli=self.event_clli)
484             self.db_access.saveDomainObject(self.prim_db_event)
485         except Exception as e:
486             msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
487             DTIProcessor.logger.warning(msg)
488             self._result['ERROR'] = msg
489             raise Exception(msg)
490         else:
491             if self.is_notify:
492                 try:
493                     # force the action to add, to avoid bad things later
494                     add_action = {"dcae_service_action": "add"}
495                     self.event.update(add_action)
496                     # mock up data
497                     mock_tp11 = (
498                         "scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
499                         "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
500                         "dcae-d1.idns.cip.corp.com", "30996")
501                     mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
502                                  "docker_node_instance_id1",
503                                  "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
504                                  "dcae-d1.idns.cip.corp.com", "30996")
505                     # tpl_arr = []
506                     # tpl_arr.append(mock_tp11)
507                     # tpl_arr.append(mock_tp12)
508                     # res_dict = dict(self.docker_pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
509                     res_dict = dict(self.docker_pool.map(notify_svc,
510                                                          ((self.event, self.db_access, self.prim_db_event, tp) for tp in
511                                                           CfyClient().iter_components(self.target_type,
512                                                                                       dcae_service_location=self.event_clli))
513                                                          ))
514                 except Exception as e:
515                     msg = "DTIProcessor.__init__() running pool.map() got exception {}: {!s}".format(type(e).__name__,
516                                                                                                      e)
517                     DTIProcessor.logger.error(msg)
518                     self._result['ERROR'] = msg
519         return res_dict
520
521     def add_replay(self):
522         """
523         convert an update event flow and replay as an add event type since the event acknowledgement is missing
524         from application database
525         """
526         res_dict = None
527         try:
528             # force the action to add, to avoid bad things later
529             add_action = {"dcae_service_action": "add"}
530             self.event.update(add_action)
531             # mock up data
532             mock_tp11 = ("scn1_k8s", "k8s_deployment_id1", "k8s_container_id1", "k8s_node_id1", "k8s_node_instance_id1",
533                          "node_instance_state", "k8s_host", "dti_reconfig_script", "k8s",
534                          "dcae-d1.idns.cip.corp.com", "30996")
535             mock_tp12 = ("scn1_docker", "docker_deployment_id1", "docker_container_id1", "docker_node_id1",
536                          "docker_node_instance_id1",
537                          "node_instance_state", "docker_host", "dti_reconfig_script", "docker",
538                          "dcae-d1.idns.cip.corp.com", "30996")
539             # tpl_arr = []
540             # tpl_arr.append(mock_tp11)
541             # tpl_arr.append(mock_tp12)
542             # res_dict = dict(self.pool.map(notify_svc, (((self.event, self.db_access, self.prim_db_event, tp) for tp in tpl_arr))))
543             res_dict = dict(self.docker_pool.map(notify_svc,
544                                                  ((self.event, self.db_access, self.prim_db_event, tp) for tp in
545                                                   CfyClient().iter_components(self.target_type,
546                                                                               dcae_service_location=self.event_clli))
547                                                  ))
548         except Exception as e:
549             msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
550             DTIProcessor.logger.error(msg)
551             self._result['ERROR'] = msg
552         return res_dict
553
554     def delete(self):
555         """
556         process DTI event that indicates a VNF instance has to be removed from the collector microservices
557         """
558         res_dict = {}
559         res_dict_k8s = {}
560         res_dict_docker = {}
561         try:
562             self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
563             if self.is_notify:
564                 try:
565                     msg = "processing delete event for {}/{} to relate with any docker hosts".format(
566                         self.target_type, self.target_name)
567                     DTIProcessor.logger.warning(msg)
568                     res_dict_docker = dict(self.docker_pool.map(notify_svc,
569                                                                 ((self.event, self.db_access, self.prim_db_event, tp)
570                                                                  for tp
571                                                                  in CfyClient().iter_components_for_docker(
572                                                                     self.target_type,
573                                                                     dcae_service_location=self.event_clli))
574                                                                 ))
575                 except Exception as e:
576                     msg = "DTIProcessor.delete() running docker_pool.map() got exception {}: {!s}".format(type(e).__name__,
577                                                                                                           e)
578                     DTIProcessor.logger.error(msg)
579                     self._result['ERROR'] = msg
580
581                 try:
582                     msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
583                         self.target_type, self.target_name)
584                     DTIProcessor.logger.warning(msg)
585                     if self.prim_db_event: 
586                         result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
587                         res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
588                             ((self.event, self.db_access, ack_item) for ack_item in result))))
589                 except Exception as e:
590                     msg = "DTIProcessor.delete() running k8s_pool.map() got exception {}: {!s}".format(type(e).__name__, e)
591                     DTIProcessor.logger.error(msg)
592                     self._result['ERROR'] = msg
593
594             try:
595                 if self.prim_db_event: 
596                     self.db_access.deleteDomainObject(self.prim_db_event)
597             except Exception as e:
598                 msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
599                 DTIProcessor.logger.warning(msg)
600                 self._result['ERROR'] = msg
601         except Exception as e:
602                 msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
603                 DTIProcessor.logger.warning(msg)
604                 self._result['ERROR'] = msg
605
606         if res_dict_k8s: 
607             utils.update_dict(res_dict, res_dict_k8s)
608
609         if res_dict_docker: 
610             utils.update_dict(res_dict, res_dict_docker)
611
612         return res_dict
613
614     def update(self):
615         """
616         process DTI event that indicates VNF instance has to be updated in the collector microservices
617         """
618         res_dict = {}
619         res_dict_k8s = {}
620         res_dict_docker = {}
621
622         if self.is_notify:
623             try:
624                 self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
625                 if self.prim_db_event: 
626                     self.db_access.update_event_item(self.event, self.target_type, self.target_name)
627                     result = self.db_access.query_event_data(self.target_type, self.target_name)
628                     if len(result) == 0:
629                         msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
630                               "replaying this event to cluster if required". \
631                             format(self.target_type, self.target_name)
632                         DTIProcessor.logger.warning(msg)
633                         self._result['WARNING'] = msg
634                         res_dict = self.add_replay()
635                     else:
636                         msg = "DTIProcessor.update() handle update flow for {}/{}, for k8s rediscover scn list and" \
637                               "identify new vs update cases".format(self.target_type, self.target_name)
638                         DTIProcessor.logger.debug(msg)
639                         try:
640                             tpl_arr = CfyClient().iter_components(self.target_type,
641                                                                   dcae_service_location=self.event_clli)
642                             res_dict_docker = dict(self.docker_pool.map(notify_svc,
643                                                                         ((
644                                                                             self.event, self.db_access,
645                                                                             self.prim_db_event,
646                                                                             tp)
647                                                                             for tp in tpl_arr)))
648                         except Exception as e:
649                             msg = "DTIProcessor.update() running docker_pool.map() got exception {}: {!s}".format(
650                                 type(e).__name__, e)
651                             DTIProcessor.logger.error(msg)
652                             self._result['ERROR'] = msg
653                 else:
654                     # event is new for the handler
655                     msg = "processing update event for {}/{}, but current event info is not found in database, " \
656                           "executing add event".format(self.target_type, self.target_name)
657                     DTIProcessor.logger.warning(msg)
658                     self._result['WARNING'] = msg
659                     res_dict = self.add()
660             except Exception as e:
661                 msg = "DTIProcessor.update() got exception {}: {!s}".format(type(e).__name__, e)
662                 DTIProcessor.logger.error(msg)
663                 self._result['ERROR'] = msg
664
665         if res_dict_k8s: 
666             utils.update_dict(res_dict, res_dict_k8s)
667
668         if res_dict_docker: 
669             utils.update_dict(res_dict, res_dict_docker)
670
671         return res_dict
672
673     def notify(self):
674         """
675         event handler to notify all the pods in the kubernetes cluster whose FQDN is present in the incoming event
676         This notification is meant for the cluster failover.
677         """
678         res_dict = {}
679         try:
680             self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
681             if self.prim_db_event: 
682                 self.db_access.update_event_item(self.event, self.target_type, self.target_name)
683             else:
684                 self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
685                                            location_clli=self.event_clli)
686                 self.db_access.saveDomainObject(self.prim_db_event)
687         except Exception as e:
688             msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
689             DTIProcessor.logger.warning(msg)
690             self._result['ERROR'] = msg
691
692         try:
693             self.k8s_pool.map(notify_pods, ((self.event, tp) for tp in
694                                             CfyClient().query_k8_components(self.target_name)))
695             for k, v in notify_response_arr:
696                 res_dict[k] = v
697         except Exception as e:
698             msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
699             DTIProcessor.logger.warning(msg)
700             self._result['WARNING'] = msg
701
702         return res_dict
703
704     def get_result(self):
705         return self._result
706
707     @classmethod
708     def get_k8_raw_events(cls, pod, cluster, namespace):
709         """
710         Get DTI events for a k8 stateful set pod container
711
712         :param pod: required
713             k8s stateful set pod ID that was configured with a specific set of DTI Events
714         :param cluster: required
715             k8s cluster FQDN where the mS was deployed
716         :param namespace: required
717             k8s namespace where the stateful set was deployed in that namespace
718         :return:
719             Dictionary of DTI event(s).
720             DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
721         """
722         db_access = EventDbAccess()
723         results = db_access.query_raw_k8_events(cluster, pod, namespace)
724
725         target_types = set([])
726         outer_dict = {}
727
728         for evnt_item in results:
729             target_types.add(evnt_item.target_type)
730
731         for targ_type in target_types:
732             inner_name_evt_dict = {}
733             for evnt in results:
734                 if targ_type == evnt.target_type:
735                     inner_name_evt_dict[evnt.target_name] = evnt.event
736
737             outer_dict[targ_type] = inner_name_evt_dict
738
739         return outer_dict
740
741     @classmethod
742     def get_docker_raw_events(cls, service_name, service_location):
743         """
744         Get DTI events for docker container.
745
746         Parameters
747         ----------
748         service_name : string
749             required.  The service component name assigned by dockerplugin to the component that is unique to the
750             cloudify node instance and used in its Consul key(s).
751         service_location : string
752             optional.  allows multiple values separated by commas.  Filters DTI events with dcae_service_location
753             in service_location.
754             If service_location is not provided, then defaults to dockerhost or k8s cluster master node service Consul
755             TAGs if service_name is provided,
756             otherwise results are not location filtered.
757
758         Returns
759         -------
760         dict
761             Dictionary of DTI event(s).
762             DTI events will be keyed by vnf_type, sub-keyed by vnf_id.
763
764         """
765
766         r_dict = {}
767
768         want_locs = []
769         if service_location:
770             want_locs = service_location.split(',')
771
772         give_types = []
773         if service_name:
774             if not want_locs:  # default to TAGs of container's dockerhost or k8s cluster master node
775                 try:
776                     node_name = ConsulClient.lookup_service(service_name)[0].get("Node")
777                     if node_name:
778                         services = ConsulClient.lookup_node(node_name).get("Services")
779                         if services:
780                             for node_svc in list(services.keys()):
781                                 if "-component-dockerhost-" in node_svc:
782                                     want_locs = services[node_svc].get("Tags", [])
783                                     break
784                 except:
785                     pass
786
787             try:
788                 supported_types = ConsulClient.get_value(service_name + ":oti")
789             except:
790                 return r_dict
791             else:
792                 if supported_types:
793                     supported_types = [t_type.lower() for t_type in list(supported_types.keys())]
794                     give_types = supported_types
795                 if not give_types or (len(give_types) == 1 and give_types[0] == ''):
796                     return r_dict
797
798         db_access = EventDbAccess()
799         results = db_access.query_raw_docker_events(give_types, want_locs)
800
801         target_types = set([])
802         outer_dict = {}
803
804         for evnt_item in results:
805             target_types.add(evnt_item.target_type)
806
807         for targ_type in target_types:
808             inner_name_evt_dict = {}
809             for evnt in results:
810                 if targ_type == evnt.target_type:
811                     inner_name_evt_dict[evnt.target_name] = evnt.event
812
813             outer_dict[targ_type] = inner_name_evt_dict
814
815         return outer_dict