vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / orchestrator / workflow_runner.py
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py
new file mode 100644 (file)
index 0000000..0c52e32
--- /dev/null
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Running workflows.
+"""
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import engine, graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class WorkflowRunner(object):
+
+    def __init__(self, model_storage, resource_storage, plugin_manager,
+                 execution_id=None, retry_failed_tasks=False,
+                 service_id=None, workflow_name=None, inputs=None, executor=None,
+                 task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+                 task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
+        """
+        Manages a single workflow execution on a given service.
+
+        :param workflow_name: workflow name
+        :param service_id: service ID
+        :param inputs: key-value dict of inputs for the execution
+        :param model_storage: model storage API ("MAPI")
+        :param resource_storage: resource storage API ("RAPI")
+        :param plugin_manager: plugin manager
+        :param executor: executor for tasks; defaults to a
+         :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
+        :param task_max_attempts: maximum attempts of repeating each failing task
+        :param task_retry_interval: retry interval between retry attempts of a failing task
+        """
+
+        if not (execution_id or (workflow_name and service_id)):
+            exceptions.InvalidWorkflowRunnerParams(
+                "Either provide execution id in order to resume a workflow or workflow name "
+                "and service id with inputs")
+
+        self._is_resume = execution_id is not None
+        self._retry_failed_tasks = retry_failed_tasks
+
+        self._model_storage = model_storage
+        self._resource_storage = resource_storage
+
+        # the IDs are stored rather than the models themselves, so this module could be used
+        # by several threads without raising errors on model objects shared between threadsF
+
+        if self._is_resume:
+            self._service_id = service_id
+            # self._service_id = self.execution.service.id
+            # self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
+            self._workflow_name = workflow_name
+            self._validate_workflow_exists_for_service()
+            self._execution_id = execution_id
+
+        else:
+            self._service_id = service_id
+            self._workflow_name = workflow_name
+            self._validate_workflow_exists_for_service()
+            self._execution_id = self._create_execution_model(inputs).id
+
+        self._create_execution_model(inputs, execution_id)
+
+        self._workflow_context = WorkflowContext(
+            name=self.__class__.__name__,
+            model_storage=self._model_storage,
+            resource_storage=resource_storage,
+            service_id=service_id,
+            execution_id=execution_id,
+            workflow_name=self._workflow_name,
+            task_max_attempts=task_max_attempts,
+            task_retry_interval=task_retry_interval)
+
+        # Set default executor and kwargs
+        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
+
+        # transforming the execution inputs to dict, to pass them to the workflow function
+        # execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues())
+
+        # if not self._is_resume:
+        #     workflow_fn = self._get_workflow_fn()
+        #     self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
+        #     compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
+        #     compiler.compile(self._tasks_graph)
+
+        self._engine = engine.Engine(executors={executor.__class__: executor})
+
+    @property
+    def execution_id(self):
+        return self._execution_id
+
+    @property
+    def execution(self):
+        return self._model_storage.execution.get(self._execution_id)
+
+    @property
+    def service(self):
+        return self._model_storage.service.get(self._service_id)
+
+    def execute(self):
+        self._engine.execute(ctx=self._workflow_context,
+                             resuming=self._is_resume,
+                             retry_failed=self._retry_failed_tasks)
+
+    def cancel(self):
+        self._engine.cancel_execution(ctx=self._workflow_context)
+
+    def _create_execution_model(self, inputs, execution_id):
+        execution = models.Execution(
+            created_at=datetime.utcnow(),
+            service=self.service,
+            workflow_name=self._workflow_name,
+            inputs={})
+
+        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+            workflow_inputs = dict()  # built-in workflows don't have any inputs
+        else:
+            workflow_inputs = self.service.workflows[self._workflow_name].inputs
+
+        # modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+        #                                              supplied_inputs=inputs or {})
+        modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+                                                             supplied_inputs=inputs or {})
+        execution.inputs = modeling_utils.merge_parameter_values(
+            inputs, workflow_inputs, model_cls=models.Input)
+        execution.id = execution_id
+        # TODO: these two following calls should execute atomically
+        self._validate_no_active_executions(execution)
+        self._model_storage.execution.put(execution)
+        return execution
+
+    def _validate_workflow_exists_for_service(self):
+        if self._workflow_name not in self.service.workflows and \
+                self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+            raise exceptions.UndeclaredWorkflowError(
+                'No workflow policy {0} declared in service {1}'
+                    .format(self._workflow_name, self.service.name))
+
+    def _validate_no_active_executions(self, execution):
+        active_executions = [e for e in self.service.executions if e.is_active()]
+        if active_executions:
+            raise exceptions.ActiveExecutionsError(
+                "Can't start execution; Service {0} has an active execution with ID {1}"
+                    .format(self.service.name, active_executions[0].id))
+
+    def _get_workflow_fn(self):
+        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+            return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
+                                                    self._workflow_name))
+
+        workflow = self.service.workflows[self._workflow_name]
+
+        # TODO: Custom workflow support needs improvement, currently this code uses internal
+        # knowledge of the resource storage; Instead, workflows should probably be loaded
+        # in a similar manner to operation plugins. Also consider passing to import_fullname
+        # as paths instead of appending to sys path.
+        service_template_resources_path = os.path.join(
+            self._resource_storage.service_template.base_path,
+            str(self.service.service_template.id))
+        sys.path.append(service_template_resources_path)
+
+        try:
+            workflow_fn = import_fullname(workflow.function)
+        except ImportError:
+            raise exceptions.WorkflowImplementationNotFoundError(
+                'Could not find workflow {0} function at {1}'.format(
+                    self._workflow_name, workflow.function))
+
+        return workflow_fn