Update python2 to python3
[vfc/nfvo/lcm.git] / lcm / workflows / graphflow / flow / manager.py
1 # Copyright 2018 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import datetime
16 from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR
17 import logging
18 import time
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskManager(object):
24
25     def __init__(self):
26         self.task_set = {}
27
28     def add_task(self, key, task, timeout=None):
29         self.task_set[key] = task
30         logger.debug("task_set %s" % self.task_set)
31
32     def update_task_status(self, key, status):
33         if key in self.task_set:
34             task = self.task_set[key]
35             task.update_task(status)
36
37     def update_task(self, key, task):
38         if key in self.task_set:
39             self.task_set[key] = task
40
41     def get_task(self, key):
42         if key in self.task_set:
43             return self.task_set[key]
44         else:
45             return None
46
47     def get_all_task(self):
48         return self.task_set
49
50     def is_all_task_finished(self, task_key_set=None):
51         states = []
52         if not task_key_set:
53             task_key_set = list(self.task_set.keys())
54         total = len(task_key_set)
55         for key in task_key_set:
56             if key in self.task_set:
57                 states.append(self.task_set[key].status)
58         if len([state for state in states if state == FINISHED]) == total:
59             return True
60         else:
61             for key in task_key_set:
62                 logger.debug("task key %s, status %s" % (key, self.task_set[key].status))
63             return False
64
65     def wait_tasks_done(self, task_key_set=None):
66         if task_key_set:
67             for key in task_key_set:
68                 if key in list(self.task_set.keys()):
69                     task = self.task_set[key]
70                     logger.debug("current wait task %s, endtime %s, status %s" % (task.key, task.endtime, task.status))
71                     while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
72                         time.sleep(1)
73                     if task.status in [STARTED, PROCESSING]:
74                         task.status = ERROR
75                     logger.debug("wait task final status %s" % task.status)
76         else:
77             for task in list(self.task_set.values()):
78                 while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
79                     time.sleep(1)
80                 if task.status in [STARTED, PROCESSING]:
81                     task.status = ERROR