Azure-plugin not sending REST calls to Azure cloud
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / orchestrator / workflow_runner.py
index 0c52e32..eb4efeb 100644 (file)
@@ -30,6 +30,7 @@ from ..modeling import models
 from ..modeling import utils as modeling_utils
 from ..utils.imports import import_fullname
 
+
 DEFAULT_TASK_MAX_ATTEMPTS = 30
 DEFAULT_TASK_RETRY_INTERVAL = 30
 
@@ -68,30 +69,24 @@ class WorkflowRunner(object):
         self._resource_storage = resource_storage
 
         # the IDs are stored rather than the models themselves, so this module could be used
-        # by several threads without raising errors on model objects shared between threadsF
+        # by several threads without raising errors on model objects shared between threads
 
         if self._is_resume:
-            self._service_id = service_id
-            # self._service_id = self.execution.service.id
-            # self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
-            self._workflow_name = workflow_name
-            self._validate_workflow_exists_for_service()
             self._execution_id = execution_id
-
+            self._service_id = self.execution.service.id
+            self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
         else:
             self._service_id = service_id
             self._workflow_name = workflow_name
             self._validate_workflow_exists_for_service()
             self._execution_id = self._create_execution_model(inputs).id
 
-        self._create_execution_model(inputs, execution_id)
-
         self._workflow_context = WorkflowContext(
             name=self.__class__.__name__,
             model_storage=self._model_storage,
             resource_storage=resource_storage,
             service_id=service_id,
-            execution_id=execution_id,
+            execution_id=self._execution_id,
             workflow_name=self._workflow_name,
             task_max_attempts=task_max_attempts,
             task_retry_interval=task_retry_interval)
@@ -100,13 +95,13 @@ class WorkflowRunner(object):
         executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
 
         # transforming the execution inputs to dict, to pass them to the workflow function
-        execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues())
+        execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues())
 
-        if not self._is_resume:
-            workflow_fn = self._get_workflow_fn()
-            self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
-            compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
-            compiler.compile(self._tasks_graph)
+        if not self._is_resume:
+            workflow_fn = self._get_workflow_fn()
+            self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
+            compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
+            compiler.compile(self._tasks_graph)
 
         self._engine = engine.Engine(executors={executor.__class__: executor})
 
@@ -116,7 +111,7 @@ class WorkflowRunner(object):
 
     @property
     def execution(self):
-        return self._model_storage.execution.get(self._execution_id)
+        return self._model_storage.execution.get(self.execution_id)
 
     @property
     def service(self):
@@ -130,7 +125,7 @@ class WorkflowRunner(object):
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)
 
-    def _create_execution_model(self, inputs, execution_id):
+    def _create_execution_model(self, inputs):
         execution = models.Execution(
             created_at=datetime.utcnow(),
             service=self.service,
@@ -148,7 +143,6 @@ class WorkflowRunner(object):
                                                              supplied_inputs=inputs or {})
         execution.inputs = modeling_utils.merge_parameter_values(
             inputs, workflow_inputs, model_cls=models.Input)
-        execution.id = execution_id
         # TODO: these two following calls should execute atomically
         self._validate_no_active_executions(execution)
         self._model_storage.execution.put(execution)
@@ -156,17 +150,17 @@ class WorkflowRunner(object):
 
     def _validate_workflow_exists_for_service(self):
         if self._workflow_name not in self.service.workflows and \
-                self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
             raise exceptions.UndeclaredWorkflowError(
                 'No workflow policy {0} declared in service {1}'
-                    .format(self._workflow_name, self.service.name))
+                .format(self._workflow_name, self.service.name))
 
     def _validate_no_active_executions(self, execution):
         active_executions = [e for e in self.service.executions if e.is_active()]
         if active_executions:
             raise exceptions.ActiveExecutionsError(
                 "Can't start execution; Service {0} has an active execution with ID {1}"
-                    .format(self.service.name, active_executions[0].id))
+                .format(self.service.name, active_executions[0].id))
 
     def _get_workflow_fn(self):
         if self._workflow_name in builtin.BUILTIN_WORKFLOWS: