Merge "add graph flow"
authorFu Jinhua <fu.jinhua@zte.com.cn>
Mon, 15 Oct 2018 11:50:48 +0000 (11:50 +0000)
committerGerrit Code Review <gerrit@onap.org>
Mon, 15 Oct 2018 11:50:48 +0000 (11:50 +0000)
lcm/workflows/graphflow/flow/__init__.py [new file with mode: 0644]
lcm/workflows/graphflow/flow/flow.py [new file with mode: 0644]
lcm/workflows/graphflow/flow/graph.py [new file with mode: 0644]
lcm/workflows/graphflow/flow/load.py [new file with mode: 0644]
lcm/workflows/graphflow/flow/manager.py [new file with mode: 0644]

diff --git a/lcm/workflows/graphflow/flow/__init__.py b/lcm/workflows/graphflow/flow/__init__.py
new file mode 100644 (file)
index 0000000..342c2a8
--- /dev/null
@@ -0,0 +1,13 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lcm/workflows/graphflow/flow/flow.py b/lcm/workflows/graphflow/flow/flow.py
new file mode 100644 (file)
index 0000000..1c5d09b
--- /dev/null
@@ -0,0 +1,79 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import threading
+import json
+from threading import Thread
+from lcm.workflows.graphflow.flow.graph import Graph
+from lcm.workflows.graphflow.flow.load import load_class_from_config
+from lcm.workflows.graphflow.flow.manager import TaskManager
+
+logger = logging.getLogger(__name__)
+
+
+def _execute_task(exec_class):
+    logger.debug("graph task class %s" % exec_class)
+    exec_class.execute()
+
+
+def create_instance(class_key, class_set, *args):
+    if class_key in class_set:
+        import_class = class_set[class_key]
+        return import_class(*args)
+    else:
+        return None
+
+
+class GraphFlow(Thread):
+    def __init__(self, graph, task_para_dict, config):
+        Thread.__init__(self)
+        self._graph = Graph(graph)
+        self._task_para_dict = task_para_dict
+        self._imp_class_set = load_class_from_config(config)
+        self.task_manager = TaskManager()
+
+    def run(self):
+        logger.debug("GraphFlow begin. graph:%s, task_para_dict:%s", self._graph, json.dumps(self._task_para_dict))
+        self.sort_nodes = self._graph.topo_sort()
+        for node in self.sort_nodes:
+            pre_nodes = self._graph.get_pre_nodes(node)
+            logger.debug("current node %s, pre_nodes %s" % (node, pre_nodes))
+            if len(pre_nodes) > 0:
+                self.task_manager.wait_tasks_done(pre_nodes)
+                if self.task_manager.is_all_task_finished(pre_nodes):
+                    self.create_task(node)
+                    logger.debug("GraphFlow create node %s", node)
+                else:
+                    logger.debug("GraphFlow, end, error")
+                    break
+            else:
+                self.create_task(node)
+                logger.debug("GraphFlow create node %s", node)
+        logger.debug("GraphFlow, end")
+
+    def create_task(self, node):
+        task_para = self._task_para_dict[node]
+        task_para["key"] = node
+        task_para["status"] = "started"
+        task_para["manager"] = self.task_manager
+        if "type" in task_para:
+            class_key = task_para["type"]
+            exec_task = create_instance(class_key, self._imp_class_set, task_para)
+            self.task_manager.add_task(node, exec_task)
+            thread_task = threading.Thread(target=_execute_task, args=(exec_task,))
+            thread_task.start()
+            return True
+        else:
+            return False
diff --git a/lcm/workflows/graphflow/flow/graph.py b/lcm/workflows/graphflow/flow/graph.py
new file mode 100644 (file)
index 0000000..334eea6
--- /dev/null
@@ -0,0 +1,73 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from collections import deque
+from collections import OrderedDict
+
+logger = logging.getLogger(__name__)
+
+
+class Graph(object):
+
+    def __init__(self, graph_dict=None):
+        self.graph = OrderedDict()
+        if graph_dict:
+            for node, dep_nodes in graph_dict.iteritems():
+                self.add_node(node, dep_nodes)
+
+    def add_node(self, node, dep_nodes):
+        if node not in self.graph:
+            self.graph[node] = set()
+        if isinstance(dep_nodes, list):
+            for dep_node in dep_nodes:
+                if dep_node not in self.graph:
+                    self.graph[dep_node] = set()
+                if dep_node not in self.graph[node]:
+                    self.graph[node].add(dep_node)
+
+    def get_pre_nodes(self, node):
+        return [k for k in self.graph if node in self.graph[k]]
+
+    def topo_sort(self):
+        degree = {}
+        for node in self.graph:
+            degree[node] = 0
+        for node in self.graph:
+            for dependent in self.graph[node]:
+                degree[dependent] += 1
+        queue = deque()
+        for node in degree:
+            if degree[node] == 0:
+                queue.appendleft(node)
+        sort_list = []
+        while queue:
+            node = queue.pop()
+            sort_list.append(node)
+            for dependent in self.graph[node]:
+                degree[dependent] -= 1
+                if degree[dependent] == 0:
+                    queue.appendleft(dependent)
+        if len(sort_list) == len(self.graph):
+            return sort_list
+        else:
+            return None
+
+    def to_dict(self):
+        dict = {}
+        for node, dependents in self.graph.iteritems():
+            dict[node] = []
+            for dep in dependents:
+                dict[node].append(dep)
+        return dict
diff --git a/lcm/workflows/graphflow/flow/load.py b/lcm/workflows/graphflow/flow/load.py
new file mode 100644 (file)
index 0000000..757be89
--- /dev/null
@@ -0,0 +1,46 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import importlib
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+def load_module(imp_module):
+    try:
+        imp_module = importlib.import_module(imp_module)
+    except Exception:
+        logger.debug("load_module error: %s", imp_module)
+        imp_module = None
+    return imp_module
+
+
+def load_class(imp_module, imp_class):
+    try:
+        cls = getattr(imp_module, imp_class)
+    except Exception:
+        logger.debug("load_class error: %s", imp_class)
+        cls = None
+    return cls
+
+
+def load_class_from_config(config):
+    class_set = {}
+    for k, v in config.iteritems():
+        imp_module = load_module(v["module"])
+        cls = load_class(imp_module, v["class"])
+        class_set[k] = cls
+    return class_set
diff --git a/lcm/workflows/graphflow/flow/manager.py b/lcm/workflows/graphflow/flow/manager.py
new file mode 100644 (file)
index 0000000..f0c2cd6
--- /dev/null
@@ -0,0 +1,81 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR
+import logging
+import time
+
+logger = logging.getLogger(__name__)
+
+
+class TaskManager(object):
+
+    def __init__(self):
+        self.task_set = {}
+
+    def add_task(self, key, task, timeout=None):
+        self.task_set[key] = task
+        logger.debug("task_set %s" % self.task_set)
+
+    def update_task_status(self, key, status):
+        if key in self.task_set:
+            task = self.task_set[key]
+            task.update_task(status)
+
+    def update_task(self, key, task):
+        if key in self.task_set:
+            self.task_set[key] = task
+
+    def get_task(self, key):
+        if key in self.task_set:
+            return self.task_set[key]
+        else:
+            return None
+
+    def get_all_task(self):
+        return self.task_set
+
+    def is_all_task_finished(self, task_key_set=None):
+        states = []
+        if not task_key_set:
+            task_key_set = self.task_set.keys()
+        total = len(task_key_set)
+        for key in task_key_set:
+            if key in self.task_set:
+                states.append(self.task_set[key].status)
+        if len([state for state in states if state == FINISHED]) == total:
+            return True
+        else:
+            for key in task_key_set:
+                logger.debug("task key %s, status %s" % (key, self.task_set[key].status))
+            return False
+
+    def wait_tasks_done(self, task_key_set=None):
+        if task_key_set:
+            for key in task_key_set:
+                if key in self.task_set.keys():
+                    task = self.task_set[key]
+                    logger.debug("current wait task %s, endtime %s, status %s" % (task.key, task.endtime, task.status))
+                    while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
+                        time.sleep(1)
+                    if task.status in [STARTED, PROCESSING]:
+                        task.status = ERROR
+                    logger.debug("wait task final status %s" % task.status)
+        else:
+            for task in self.task_set.itervalues():
+                while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
+                    time.sleep(1)
+                if task.status in [STARTED, PROCESSING]:
+                    task.status = ERROR