add async task in workflow
[vfc/nfvo/lcm.git] / lcm / workflows / graphflow / task / async_task.py
1 # Copyright 2018 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import time
17 import datetime
18 from threading import Thread
19 from lcm.workflows.graphflow.task.task import Task
20
21 logger = logging.getLogger(__name__)
22
23
24 class AsyncTask(Task):
25
26     def __init__(self, *args):
27         super(AsyncTask, self).__init__(*args)
28
29     def execute(self):
30         logger.debug("start task: %s", self.key)
31         status, output = self.run()
32         self.update_task(status, output)
33         if status == self.PROCESSING:
34             WatchTask(self).start()
35
36     def run(self):
37         pass
38
39     def get_task_status(self):
40         status = self.get_ext_status()
41         return status if status else self.status
42
43     def update_task_status(self, status):
44         self.status = status
45
46     def get_ext_status(self):
47         return None
48
49
50 class WatchTask(Thread):
51
52     def __init__(self, task):
53         Thread.__init__(self)
54         self.task = task
55         self.timeout = task.timeout
56         self.endtime = (datetime.datetime.now() + datetime.timedelta(seconds=self.timeout)).strftime(self.task.TIME_FORMAT)
57
58     def run(self):
59         status = ""
60         while status not in [self.task.FINISHED, self.task.ERROR] and self.endtime >= datetime.datetime.now().strftime(self.task.TIME_FORMAT):
61             status = self.task.get_task_status()
62             logger.debug("task %s, status %s", self.task.key, status)
63             time.sleep(1)
64         status = self.task.ERROR if status != self.task.FINISHED else self.task.FINISHED
65         self.task.update_task_status(status)