From 1a33c98a346d9657acb600ea888297820d66a258 Mon Sep 17 00:00:00 2001 From: maopengzhang Date: Mon, 15 Oct 2018 16:37:30 +0800 Subject: [PATCH] add async task in workflow add async task in workflow to support NS Change-Id: I91d463c3461e7970c96c42c77bed471043fc76e8 Issue-ID: VFC-1041 Signed-off-by: maopengzhang --- lcm/workflows/graphflow/task/async_rest_task.py | 40 +++++++++++++++ lcm/workflows/graphflow/task/async_task.py | 65 +++++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 lcm/workflows/graphflow/task/async_rest_task.py create mode 100644 lcm/workflows/graphflow/task/async_task.py diff --git a/lcm/workflows/graphflow/task/async_rest_task.py b/lcm/workflows/graphflow/task/async_rest_task.py new file mode 100644 index 00000000..ac76d7bf --- /dev/null +++ b/lcm/workflows/graphflow/task/async_rest_task.py @@ -0,0 +1,40 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from lcm.workflows.graphflow.task.async_task import AsyncTask + +logger = logging.getLogger(__name__) + + +class ASyncRestTask(AsyncTask): + STATUS_OK = (HTTP_200_OK, HTTP_201_CREATED, HTTP_204_NO_CONTENT, HTTP_202_ACCEPTED) = ('200', '201', '204', '202') + HTTP_METHOD = (POST, GET, PUT, DELETE) = ("POST", "GET", "PUT", "DELETE") + + def __init__(self, *args): + super(ASyncRestTask, self).__init__(*args) + self.url = self.input.get(self.URL, "") + self.method = self.input.get(self.METHOD, "") + self.content = self.input.get(self.CONTENT, "") + + def run(self): + status, resp_content = self.call_rest(self.url, self.method, self.content) + if status not in self.STATUS_OK: + status = self.ERROR + else: + status = self.PROCESSING + return status, resp_content + + def call_rest(self, url, method, content=None): + pass diff --git a/lcm/workflows/graphflow/task/async_task.py b/lcm/workflows/graphflow/task/async_task.py new file mode 100644 index 00000000..65a1274f --- /dev/null +++ b/lcm/workflows/graphflow/task/async_task.py @@ -0,0 +1,65 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import datetime +from threading import Thread +from lcm.workflows.graphflow.task.task import Task + +logger = logging.getLogger(__name__) + + +class AsyncTask(Task): + + def __init__(self, *args): + super(AsyncTask, self).__init__(*args) + + def execute(self): + logger.debug("start task: %s", self.key) + status, output = self.run() + self.update_task(status, output) + if status == self.PROCESSING: + WatchTask(self).start() + + def run(self): + pass + + def get_task_status(self): + status = self.get_ext_status() + return status if status else self.status + + def update_task_status(self, status): + self.status = status + + def get_ext_status(self): + return None + + +class WatchTask(Thread): + + def __init__(self, task): + Thread.__init__(self) + self.task = task + self.timeout = task.timeout + self.endtime = (datetime.datetime.now() + datetime.timedelta(seconds=self.timeout)).strftime(self.task.TIME_FORMAT) + + def run(self): + status = "" + while status not in [self.task.FINISHED, self.task.ERROR] and self.endtime >= datetime.datetime.now().strftime(self.task.TIME_FORMAT): + status = self.task.get_task_status() + logger.debug("task %s, status %s", self.task.key, status) + time.sleep(1) + status = self.task.ERROR if status != self.task.FINISHED else self.task.FINISHED + self.task.update_task_status(status) -- 2.16.6