1 # Copyright 2018 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR
20 logger = logging.getLogger(__name__)
23 class TaskManager(object):
28 def add_task(self, key, task, timeout=None):
29 self.task_set[key] = task
30 logger.debug("task_set %s" % self.task_set)
32 def update_task_status(self, key, status):
33 if key in self.task_set:
34 task = self.task_set[key]
35 task.update_task(status)
37 def update_task(self, key, task):
38 if key in self.task_set:
39 self.task_set[key] = task
41 def get_task(self, key):
42 if key in self.task_set:
43 return self.task_set[key]
47 def get_all_task(self):
50 def is_all_task_finished(self, task_key_set=None):
53 task_key_set = list(self.task_set.keys())
54 total = len(task_key_set)
55 for key in task_key_set:
56 if key in self.task_set:
57 states.append(self.task_set[key].status)
58 if len([state for state in states if state == FINISHED]) == total:
61 for key in task_key_set:
62 logger.debug("task key %s, status %s" % (key, self.task_set[key].status))
65 def wait_tasks_done(self, task_key_set=None):
67 for key in task_key_set:
68 if key in list(self.task_set.keys()):
69 task = self.task_set[key]
70 logger.debug("current wait task %s, endtime %s, status %s" % (task.key, task.endtime, task.status))
71 while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
73 if task.status in [STARTED, PROCESSING]:
75 logger.debug("wait task final status %s" % task.status)
77 for task in list(self.task_set.values()):
78 while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
80 if task.status in [STARTED, PROCESSING]: