Merge "Initial delivery of helm charts to deploy mod2 components. Resolved all the...
[dcaegen2/platform.git] / oti / event-handler / otihandler / dti_processor.py
index 970e020..5802233 100644 (file)
@@ -36,51 +36,51 @@ lock = Lock()
 K8S_CLUSTER_PROXY_NODE_PORT = '30132'
 
 
-def notify_docker(args_tuple):
-    """
-    event notification executor inside a process pool to communicate with docker container
-    interacts with docker client library
-    """
-    (dti_event, db_access, ack_item) = args_tuple
-    try:
-        dcae_service_action = dti_event.get('dcae_service_action')
-        component_scn = ack_item.service_component
-        deployment_id = ack_item.deployment_id
-        container_id = ack_item.container_id
-        docker_host = ack_item.docker_host
-        reconfig_script = ack_item.reconfig_script
-        container_type = 'docker'
-    except Exception as e:
-        return (
-            "ERROR", "dti_processor.notify_docker processing args got exception {}: {!s}".format(type(e).__name__, e))
-    what = ""
-    try:
-        what = "{} in {} container {} on {} that was deployed by {}".format(
-            reconfig_script, container_type, container_id, docker_host, deployment_id)
-        if dcae_service_action == 'add':
-            add_action = {"dcae_service_action": "deploy"}
-            dti_event.update(add_action)
-
-        if dcae_service_action == 'delete':
-            add_action = {"dcae_service_action": "undeploy"}
-            dti_event.update(add_action)
-
-        # dkr = DockerClient(docker_host, reauth=False)
-        result = ''
-        # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
-        if dti_event.get('dcae_service_action') == 'undeploy':
-            # delete from dti_event_ack table
-            try:
-                db_access.deleteDomainObject(ack_item)
-            except Exception as e:
-                msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
-                DTIProcessor.logger.warn(msg)
-                return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
-            else:
-                return (component_scn, "ran {}, got: {!s}".format(what, result))
-
-    except Exception as e:
-        return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+def notify_docker(args_tuple):
+    """
+    event notification executor inside a process pool to communicate with docker container
+    interacts with docker client library
+    """
+    (dti_event, db_access, ack_item) = args_tuple
+    try:
+        dcae_service_action = dti_event.get('dcae_service_action')
+        component_scn = ack_item.service_component
+        deployment_id = ack_item.deployment_id
+        container_id = ack_item.container_id
+        docker_host = ack_item.docker_host
+        reconfig_script = ack_item.reconfig_script
+        container_type = 'docker'
+    except Exception as e:
+        return (
+#             "ERROR", "dti_processor.notify_docker() processing args got exception {}: {!s}".format(type(e).__name__, e))
+    what = ""
+    try:
+        what = "{} in {} container {} on {} that was deployed by {}".format(
+            reconfig_script, container_type, container_id, docker_host, deployment_id)
+        if dcae_service_action == 'add':
+            add_action = {"dcae_service_action": "deploy"}
+            dti_event.update(add_action)
+# 
+        if dcae_service_action == 'delete':
+            add_action = {"dcae_service_action": "undeploy"}
+            dti_event.update(add_action)
+# 
+        # dkr = DockerClient(docker_host, reauth=False)
+        result = ''
+        # result = dkr.notify_for_reconfiguration(container_id, [ reconfig_script, "dti", json.dumps(dti_event) ])
+        if dti_event.get('dcae_service_action') == 'undeploy':
+            # delete from dti_event_ack table
+            try:
+                db_access.deleteDomainObject(ack_item)
+            except Exception as e:
+                msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
+#                 DTIProcessor.logger.warning(msg)
+                return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
+            else:
+                return (component_scn, "ran {}, got: {!s}".format(what, result))
+# 
+    except Exception as e:
+        return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
 
 
 def notify_svc(args_tuple):
@@ -103,7 +103,7 @@ def notify_svc(args_tuple):
         reconfig_script = res_tuple[7]
         container_type = res_tuple[8]
     except Exception as e:
-        return ("ERROR", "oti_processor.notify processing args got exception {}: {!s}".format(type(e).__name__, e))
+        return ("ERROR", "dti_processor.notify_svc() processing args got exception {}: {!s}".format(type(e).__name__, e))
 
     what = ""
     if container_type == "docker":
@@ -136,7 +136,7 @@ def notify_svc(args_tuple):
                     db_access.saveDomainObject(upd_evt_ack)
                 except Exception as e:
                     msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
-                    DTIProcessor.logger.warn(msg)
+                    DTIProcessor.logger.warning(msg)
                     return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
             else:
                 DTIProcessor.logger.debug("running {}".format(what))
@@ -152,39 +152,39 @@ def notify_svc(args_tuple):
                         db_access.saveDomainObject(add_evt_ack)
                     except Exception as e:
                         msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
-                        DTIProcessor.logger.warn(msg)
+                        DTIProcessor.logger.warning(msg)
                         return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
                 else:
                     # remove from dtih_event_ack if present
-                    if curr_evt is not None:
+                    if curr_evt
                         try:
                             del_evt_ack = db_access.query_event_info_docker(curr_evt, component_scn, deployment_id,
                                                                             container_id)
                             db_access.deleteDomainObject(del_evt_ack)
                         except Exception as e:
                             msg = "trying to delete event ack record for docker service, got exception {}: {!s}".format(type(e).__name__, e)
-                            DTIProcessor.logger.warn(msg)
+                            DTIProcessor.logger.warning(msg)
                             return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
         except Exception as e:
             return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
 
         return (component_scn, "ran {}, got: {!s}".format(what, result))
     elif container_type == "k8s":
-        DTIProcessor.logger.debug("DTIProcessor.notify_svc() handling k8s component")
+        DTIProcessor.logger.debug("dti_processor.notify_svc() handling k8s component")
         # if action is 'update', check if k8s pod info exists already for this event in app db
         if dcae_service_action == 'add':
-            DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for add action")
+            DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for add action")
             return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
         elif dcae_service_action == 'update':
             # handle update for pods being tracked and handle add for new pods
             k8s_scn_result = db_access.query_event_data_k8s_pod(curr_evt, component_scn)
-            if k8s_scn_result is not None:
+            if k8s_scn_result
                 # update
-                DTIProcessor.logger.debug("DTIProcessor.notify_svc() in k8s for update action")
+                DTIProcessor.logger.debug("dti_processor.notify_svc() in k8s for update action")
                 return notify_k8s_pod((dti_event, db_access, k8s_scn_result))
             else:
                 # add
-                DTIProcessor.logger.debug("DTIProcessor.notify_svc(), convert update to add action in k8s ")
+                DTIProcessor.logger.debug("dti_processor.notify_svc(), convert update to add action in k8s ")
                 add_action = {"dcae_service_action": "add"}
                 dti_event.update(add_action)
                 return notify_k8s((dti_event, db_access, curr_evt, res_tuple))
@@ -248,7 +248,7 @@ def notify_k8s(args_tuple):
         return (component_scn, "ran {}, got: {!s}".format(what, event_ack_info))
     except Exception as e:
         msg = "trying to store event ack record, got exception {}: {!s}".format(type(e).__name__, e)
-        DTIProcessor.logger.warn(msg)
+        DTIProcessor.logger.warning(msg)
         return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
 
 
@@ -304,7 +304,7 @@ def notify_pods(args_tuple):
                     notify_response_arr.append((pod_id, "ran {}, got: {!s}".format(what, event_ack_info)))
     except Exception as e:
         with lock:
-            notify_response_arr.append (("ERROR", "dti_processor.notify() processing args got exception {}: {!s}".format(type(e).__name__, e)))
+            notify_response_arr.append (("ERROR", "dti_processor.notify_pods() processing args got exception {}: {!s}".format(type(e).__name__, e)))
 
 def notify_k8s_pod(args_tuple):
     """
@@ -339,7 +339,7 @@ def notify_k8s_pod(args_tuple):
                 db_access.deleteDomainObject(ack_item)
             except Exception as e:
                 msg = "trying to delete event ack record, got exception {}: {!s}".format(type(e).__name__, e)
-                DTIProcessor.logger.warn(msg)
+                DTIProcessor.logger.warning(msg)
                 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
         else:
             try:
@@ -347,7 +347,7 @@ def notify_k8s_pod(args_tuple):
                 db_access.saveDomainObject(ack_item)
             except Exception as e:
                 msg = "trying to update event ack record, got exception {}: {!s}".format(type(e).__name__, e)
-                DTIProcessor.logger.warn(msg)
+                DTIProcessor.logger.warning(msg)
                 return (component_scn, "exception {}: {!s} running {}".format(type(e).__name__, e, what))
 
     return (component_scn, "ran {}, got: {!s}".format(what, response.json()))
@@ -448,12 +448,12 @@ class DTIProcessor(object):
             result = ConsulClient.delete_key(key)
         except Exception as e:
             msg = "trying to delete Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
-            DTIProcessor.logger.warn(msg)
+            DTIProcessor.logger.warning(msg)
             self._result['WARNING'] = msg
         else:
             if not result:
                 msg = "VNF instance {} was not in Consul dti_events historical folder".format(self.target_name)
-                DTIProcessor.logger.warn(msg)
+                DTIProcessor.logger.warning(msg)
                 self._result['WARNING'] = msg
 
     def deploy(self):
@@ -467,7 +467,7 @@ class DTIProcessor(object):
             result = ConsulClient.store_kvs({dep_key: self.event})
         except Exception as e:
             msg = "trying to store Consul history key {}, got exception {}: {!s}".format(key, type(e).__name__, e)
-            DTIProcessor.logger.warn(msg)
+            DTIProcessor.logger.warning(msg)
             self._result['WARNING'] = msg
 
     def add(self):
@@ -484,7 +484,7 @@ class DTIProcessor(object):
             self.db_access.saveDomainObject(self.prim_db_event)
         except Exception as e:
             msg = "trying to store event, got exception {}: {!s}".format(type(e).__name__, e.args)
-            DTIProcessor.logger.warn(msg)
+            DTIProcessor.logger.warning(msg)
             self._result['ERROR'] = msg
             raise Exception(msg)
         else:
@@ -546,7 +546,7 @@ class DTIProcessor(object):
                                                                               dcae_service_location=self.event_clli))
                                                  ))
         except Exception as e:
-            msg = "DTIProcessor._add() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
+            msg = "DTIProcessor.add_replay() running pool.map() got exception {}: {!s}".format(type(e).__name__, e)
             DTIProcessor.logger.error(msg)
             self._result['ERROR'] = msg
         return res_dict
@@ -564,7 +564,7 @@ class DTIProcessor(object):
                 try:
                     msg = "processing delete event for {}/{} to relate with any docker hosts".format(
                         self.target_type, self.target_name)
-                    DTIProcessor.logger.warn(msg)
+                    DTIProcessor.logger.warning(msg)
                     res_dict_docker = dict(self.docker_pool.map(notify_svc,
                                                                 ((self.event, self.db_access, self.prim_db_event, tp)
                                                                  for tp
@@ -581,8 +581,8 @@ class DTIProcessor(object):
                 try:
                     msg = "processing delete event for {}/{} to relate with any k8s hosts".format(
                         self.target_type, self.target_name)
-                    DTIProcessor.logger.warn(msg)
-                    if self.prim_db_event is not None:
+                    DTIProcessor.logger.warning(msg)
+                    if self.prim_db_event
                         result = self.db_access.query_event_data_k8s(self.target_type, self.target_name)
                         res_dict_k8s = dict(self.k8s_pool.map(notify_k8s_pod, (
                             ((self.event, self.db_access, ack_item) for ack_item in result))))
@@ -592,21 +592,21 @@ class DTIProcessor(object):
                     self._result['ERROR'] = msg
 
             try:
-                if self.prim_db_event is not None:
+                if self.prim_db_event
                     self.db_access.deleteDomainObject(self.prim_db_event)
             except Exception as e:
                 msg = "trying to delete event from database, got exception {}: {!s}".format(type(e).__name__, e.args)
-                DTIProcessor.logger.warn(msg)
+                DTIProcessor.logger.warning(msg)
                 self._result['ERROR'] = msg
         except Exception as e:
                 msg = "trying to process delete event, got exception {}: {!s}".format(type(e).__name__, e.args)
-                DTIProcessor.logger.warn(msg)
+                DTIProcessor.logger.warning(msg)
                 self._result['ERROR'] = msg
 
-        if res_dict_k8s is not None:
+        if res_dict_k8s
             utils.update_dict(res_dict, res_dict_k8s)
 
-        if res_dict_docker is not None:
+        if res_dict_docker
             utils.update_dict(res_dict, res_dict_docker)
 
         return res_dict
@@ -622,14 +622,14 @@ class DTIProcessor(object):
         if self.is_notify:
             try:
                 self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
-                if self.prim_db_event is not None:
+                if self.prim_db_event
                     self.db_access.update_event_item(self.event, self.target_type, self.target_name)
                     result = self.db_access.query_event_data(self.target_type, self.target_name)
                     if len(result) == 0:
                         msg = "processing update event for {}/{}, but event distribution info is not found in database, " \
                               "replaying this event to cluster if required". \
                             format(self.target_type, self.target_name)
-                        DTIProcessor.logger.warn(msg)
+                        DTIProcessor.logger.warning(msg)
                         self._result['WARNING'] = msg
                         res_dict = self.add_replay()
                     else:
@@ -654,7 +654,7 @@ class DTIProcessor(object):
                     # event is new for the handler
                     msg = "processing update event for {}/{}, but current event info is not found in database, " \
                           "executing add event".format(self.target_type, self.target_name)
-                    DTIProcessor.logger.warn(msg)
+                    DTIProcessor.logger.warning(msg)
                     self._result['WARNING'] = msg
                     res_dict = self.add()
             except Exception as e:
@@ -662,10 +662,10 @@ class DTIProcessor(object):
                 DTIProcessor.logger.error(msg)
                 self._result['ERROR'] = msg
 
-        if res_dict_k8s is not None:
+        if res_dict_k8s
             utils.update_dict(res_dict, res_dict_k8s)
 
-        if res_dict_docker is not None:
+        if res_dict_docker
             utils.update_dict(res_dict, res_dict_docker)
 
         return res_dict
@@ -678,7 +678,7 @@ class DTIProcessor(object):
         res_dict = {}
         try:
             self.prim_db_event = self.db_access.query_event_item(self.target_type, self.target_name)
-            if self.prim_db_event is not None:
+            if self.prim_db_event
                 self.db_access.update_event_item(self.event, self.target_type, self.target_name)
             else:
                 self.prim_db_event = Event(event=self.event, target_name=self.target_name, target_type=self.target_type,
@@ -686,7 +686,7 @@ class DTIProcessor(object):
                 self.db_access.saveDomainObject(self.prim_db_event)
         except Exception as e:
             msg = "trying to store notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
-            DTIProcessor.logger.warn(msg)
+            DTIProcessor.logger.warning(msg)
             self._result['ERROR'] = msg
 
         try:
@@ -696,7 +696,7 @@ class DTIProcessor(object):
                 res_dict[k] = v
         except Exception as e:
             msg = "trying to run notify event, got exception {}: {!s}".format(type(e).__name__, e.args)
-            DTIProcessor.logger.warn(msg)
+            DTIProcessor.logger.warning(msg)
             self._result['WARNING'] = msg
 
         return res_dict
@@ -785,7 +785,7 @@ class DTIProcessor(object):
                     pass
 
             try:
-                supported_types = ConsulClient.get_value(service_name + ":dti")
+                supported_types = ConsulClient.get_value(service_name + ":oti")
             except:
                 return r_dict
             else: