1 # Copyright 2018 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from threading import Thread
19 from lcm.workflows.graphflow.task.task import Task
21 logger = logging.getLogger(__name__)
24 class AsyncTask(Task):
26 def __init__(self, *args):
27 super(AsyncTask, self).__init__(*args)
30 logger.debug("start task: %s", self.key)
31 status, output = self.run()
32 self.update_task(status, output)
33 if status == self.PROCESSING:
34 WatchTask(self).start()
39 def get_task_status(self):
40 status = self.get_ext_status()
41 return status if status else self.status
43 def update_task_status(self, status):
46 def get_ext_status(self):
50 class WatchTask(Thread):
52 def __init__(self, task):
55 self.timeout = task.timeout
56 self.endtime = (datetime.datetime.now() + datetime.timedelta(seconds=self.timeout)).strftime(self.task.TIME_FORMAT)
60 while status not in [self.task.FINISHED, self.task.ERROR] and self.endtime >= datetime.datetime.now().strftime(self.task.TIME_FORMAT):
61 status = self.task.get_task_status()
62 logger.debug("task %s, status %s", self.task.key, status)
64 status = self.task.ERROR if status != self.task.FINISHED else self.task.FINISHED
65 self.task.update_task_status(status)