add async task in workflow 51/70451/1
authormaopengzhang <zhang.maopeng1@zte.com.cn>
Mon, 15 Oct 2018 08:37:30 +0000 (16:37 +0800)
committermaopengzhang <zhang.maopeng1@zte.com.cn>
Mon, 15 Oct 2018 08:37:30 +0000 (16:37 +0800)
add async task in workflow to support NS

Change-Id: I91d463c3461e7970c96c42c77bed471043fc76e8
Issue-ID: VFC-1041
Signed-off-by: maopengzhang <zhang.maopeng1@zte.com.cn>
lcm/workflows/graphflow/task/async_rest_task.py [new file with mode: 0644]
lcm/workflows/graphflow/task/async_task.py [new file with mode: 0644]

diff --git a/lcm/workflows/graphflow/task/async_rest_task.py b/lcm/workflows/graphflow/task/async_rest_task.py
new file mode 100644 (file)
index 0000000..ac76d7b
--- /dev/null
@@ -0,0 +1,40 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from lcm.workflows.graphflow.task.async_task import AsyncTask
+
+logger = logging.getLogger(__name__)
+
+
+class ASyncRestTask(AsyncTask):
+    STATUS_OK = (HTTP_200_OK, HTTP_201_CREATED, HTTP_204_NO_CONTENT, HTTP_202_ACCEPTED) = ('200', '201', '204', '202')
+    HTTP_METHOD = (POST, GET, PUT, DELETE) = ("POST", "GET", "PUT", "DELETE")
+
+    def __init__(self, *args):
+        super(ASyncRestTask, self).__init__(*args)
+        self.url = self.input.get(self.URL, "")
+        self.method = self.input.get(self.METHOD, "")
+        self.content = self.input.get(self.CONTENT, "")
+
+    def run(self):
+        status, resp_content = self.call_rest(self.url, self.method, self.content)
+        if status not in self.STATUS_OK:
+            status = self.ERROR
+        else:
+            status = self.PROCESSING
+        return status, resp_content
+
+    def call_rest(self, url, method, content=None):
+        pass
diff --git a/lcm/workflows/graphflow/task/async_task.py b/lcm/workflows/graphflow/task/async_task.py
new file mode 100644 (file)
index 0000000..65a1274
--- /dev/null
@@ -0,0 +1,65 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+import datetime
+from threading import Thread
+from lcm.workflows.graphflow.task.task import Task
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncTask(Task):
+
+    def __init__(self, *args):
+        super(AsyncTask, self).__init__(*args)
+
+    def execute(self):
+        logger.debug("start task: %s", self.key)
+        status, output = self.run()
+        self.update_task(status, output)
+        if status == self.PROCESSING:
+            WatchTask(self).start()
+
+    def run(self):
+        pass
+
+    def get_task_status(self):
+        status = self.get_ext_status()
+        return status if status else self.status
+
+    def update_task_status(self, status):
+        self.status = status
+
+    def get_ext_status(self):
+        return None
+
+
+class WatchTask(Thread):
+
+    def __init__(self, task):
+        Thread.__init__(self)
+        self.task = task
+        self.timeout = task.timeout
+        self.endtime = (datetime.datetime.now() + datetime.timedelta(seconds=self.timeout)).strftime(self.task.TIME_FORMAT)
+
+    def run(self):
+        status = ""
+        while status not in [self.task.FINISHED, self.task.ERROR] and self.endtime >= datetime.datetime.now().strftime(self.task.TIME_FORMAT):
+            status = self.task.get_task_status()
+            logger.debug("task %s, status %s", self.task.key, status)
+            time.sleep(1)
+        status = self.task.ERROR if status != self.task.FINISHED else self.task.FINISHED
+        self.task.update_task_status(status)