0c52e3220771333f2c991ea85c54585ece3ae344
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / orchestrator / workflow_runner.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 Running workflows.
18 """
19
20 import os
21 import sys
22 from datetime import datetime
23
24 from . import exceptions
25 from .context.workflow import WorkflowContext
26 from .workflows import builtin
27 from .workflows.core import engine, graph_compiler
28 from .workflows.executor.process import ProcessExecutor
29 from ..modeling import models
30 from ..modeling import utils as modeling_utils
31 from ..utils.imports import import_fullname
32
33 DEFAULT_TASK_MAX_ATTEMPTS = 30
34 DEFAULT_TASK_RETRY_INTERVAL = 30
35
36
37 class WorkflowRunner(object):
38
39     def __init__(self, model_storage, resource_storage, plugin_manager,
40                  execution_id=None, retry_failed_tasks=False,
41                  service_id=None, workflow_name=None, inputs=None, executor=None,
42                  task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
43                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
44         """
45         Manages a single workflow execution on a given service.
46
47         :param workflow_name: workflow name
48         :param service_id: service ID
49         :param inputs: key-value dict of inputs for the execution
50         :param model_storage: model storage API ("MAPI")
51         :param resource_storage: resource storage API ("RAPI")
52         :param plugin_manager: plugin manager
53         :param executor: executor for tasks; defaults to a
54          :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
55         :param task_max_attempts: maximum attempts of repeating each failing task
56         :param task_retry_interval: retry interval between retry attempts of a failing task
57         """
58
59         if not (execution_id or (workflow_name and service_id)):
60             exceptions.InvalidWorkflowRunnerParams(
61                 "Either provide execution id in order to resume a workflow or workflow name "
62                 "and service id with inputs")
63
64         self._is_resume = execution_id is not None
65         self._retry_failed_tasks = retry_failed_tasks
66
67         self._model_storage = model_storage
68         self._resource_storage = resource_storage
69
70         # the IDs are stored rather than the models themselves, so this module could be used
71         # by several threads without raising errors on model objects shared between threadsF
72
73         if self._is_resume:
74             self._service_id = service_id
75             # self._service_id = self.execution.service.id
76             # self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
77             self._workflow_name = workflow_name
78             self._validate_workflow_exists_for_service()
79             self._execution_id = execution_id
80
81         else:
82             self._service_id = service_id
83             self._workflow_name = workflow_name
84             self._validate_workflow_exists_for_service()
85             self._execution_id = self._create_execution_model(inputs).id
86
87         self._create_execution_model(inputs, execution_id)
88
89         self._workflow_context = WorkflowContext(
90             name=self.__class__.__name__,
91             model_storage=self._model_storage,
92             resource_storage=resource_storage,
93             service_id=service_id,
94             execution_id=execution_id,
95             workflow_name=self._workflow_name,
96             task_max_attempts=task_max_attempts,
97             task_retry_interval=task_retry_interval)
98
99         # Set default executor and kwargs
100         executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
101
102         # transforming the execution inputs to dict, to pass them to the workflow function
103         # execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues())
104
105         # if not self._is_resume:
106         #     workflow_fn = self._get_workflow_fn()
107         #     self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
108         #     compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
109         #     compiler.compile(self._tasks_graph)
110
111         self._engine = engine.Engine(executors={executor.__class__: executor})
112
113     @property
114     def execution_id(self):
115         return self._execution_id
116
117     @property
118     def execution(self):
119         return self._model_storage.execution.get(self._execution_id)
120
121     @property
122     def service(self):
123         return self._model_storage.service.get(self._service_id)
124
125     def execute(self):
126         self._engine.execute(ctx=self._workflow_context,
127                              resuming=self._is_resume,
128                              retry_failed=self._retry_failed_tasks)
129
130     def cancel(self):
131         self._engine.cancel_execution(ctx=self._workflow_context)
132
133     def _create_execution_model(self, inputs, execution_id):
134         execution = models.Execution(
135             created_at=datetime.utcnow(),
136             service=self.service,
137             workflow_name=self._workflow_name,
138             inputs={})
139
140         if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
141             workflow_inputs = dict()  # built-in workflows don't have any inputs
142         else:
143             workflow_inputs = self.service.workflows[self._workflow_name].inputs
144
145         # modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
146         #                                              supplied_inputs=inputs or {})
147         modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
148                                                              supplied_inputs=inputs or {})
149         execution.inputs = modeling_utils.merge_parameter_values(
150             inputs, workflow_inputs, model_cls=models.Input)
151         execution.id = execution_id
152         # TODO: these two following calls should execute atomically
153         self._validate_no_active_executions(execution)
154         self._model_storage.execution.put(execution)
155         return execution
156
157     def _validate_workflow_exists_for_service(self):
158         if self._workflow_name not in self.service.workflows and \
159                 self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
160             raise exceptions.UndeclaredWorkflowError(
161                 'No workflow policy {0} declared in service {1}'
162                     .format(self._workflow_name, self.service.name))
163
164     def _validate_no_active_executions(self, execution):
165         active_executions = [e for e in self.service.executions if e.is_active()]
166         if active_executions:
167             raise exceptions.ActiveExecutionsError(
168                 "Can't start execution; Service {0} has an active execution with ID {1}"
169                     .format(self.service.name, active_executions[0].id))
170
171     def _get_workflow_fn(self):
172         if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
173             return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
174                                                     self._workflow_name))
175
176         workflow = self.service.workflows[self._workflow_name]
177
178         # TODO: Custom workflow support needs improvement, currently this code uses internal
179         # knowledge of the resource storage; Instead, workflows should probably be loaded
180         # in a similar manner to operation plugins. Also consider passing to import_fullname
181         # as paths instead of appending to sys path.
182         service_template_resources_path = os.path.join(
183             self._resource_storage.service_template.base_path,
184             str(self.service.service_template.id))
185         sys.path.append(service_template_resources_path)
186
187         try:
188             workflow_fn = import_fullname(workflow.function)
189         except ImportError:
190             raise exceptions.WorkflowImplementationNotFoundError(
191                 'Could not find workflow {0} function at {1}'.format(
192                     self._workflow_name, workflow.function))
193
194         return workflow_fn