1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 ARIA modeling orchestration module
20 # pylint: disable=no-self-argument, no-member, abstract-method
21 from datetime import datetime
23 from sqlalchemy import (
34 from sqlalchemy.ext.declarative import declared_attr
36 from ..orchestrator.exceptions import (TaskAbortException, TaskRetryException)
40 types as modeling_types
44 class ExecutionBase(mixins.ModelMixin):
49 __tablename__ = 'execution'
51 __private_fields__ = ('service_fk',
54 SUCCEEDED = 'succeeded'
56 CANCELLED = 'cancelled'
59 CANCELLING = 'cancelling'
61 STATES = (SUCCEEDED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING)
62 END_STATES = (SUCCEEDED, FAILED, CANCELLED)
65 PENDING: (STARTED, CANCELLED),
66 STARTED: END_STATES + (CANCELLING,),
67 CANCELLING: END_STATES,
73 # region one_to_many relationships
80 :type: {:obj:`basestring`: :class:`Input`}
82 return relationship.one_to_many(cls, 'input', dict_key='name')
89 :type: [:class:`Task`]
91 return relationship.one_to_many(cls, 'task')
96 Log messages for the execution (including log messages for its tasks).
100 return relationship.one_to_many(cls, 'log')
104 # region many_to_one relationships
111 :type: :class:`Service`
113 return relationship.many_to_one(cls, 'service')
117 # region association proxies
120 def service_name(cls):
121 return relationship.association_proxy('service', cls.name_column_name())
124 def service_template(cls):
125 return relationship.association_proxy('service', 'service_template')
128 def service_template_name(cls):
129 return relationship.association_proxy('service', 'service_template_name')
133 # region foreign keys
137 return relationship.foreign_key('service')
141 created_at = Column(DateTime, index=True, doc="""
144 :type: :class:`~datetime.datetime`
147 started_at = Column(DateTime, nullable=True, index=True, doc="""
150 :type: :class:`~datetime.datetime`
153 ended_at = Column(DateTime, nullable=True, index=True, doc="""
156 :type: :class:`~datetime.datetime`
159 error = Column(Text, nullable=True, doc="""
162 :type: :obj:`basestring`
165 status = Column(Enum(*STATES, name='execution_status'), default=PENDING, doc="""
168 :type: :obj:`basestring`
171 workflow_name = Column(Text, doc="""
174 :type: :obj:`basestring`
177 @orm.validates('status')
178 def validate_status(self, key, value):
179 """Validation function that verifies execution status transitions are OK"""
181 current_status = getattr(self, key)
182 except AttributeError:
184 valid_transitions = self.VALID_TRANSITIONS.get(current_status, [])
185 if all([current_status is not None,
186 current_status != value,
187 value not in valid_transitions]):
188 raise ValueError('Cannot change execution status from {current} to {new}'.format(
189 current=current_status,
194 return self.status in self.END_STATES
197 return not self.has_ended() and self.status != self.PENDING
200 return '<{0} id=`{1}` (status={2})>'.format(
201 self.__class__.__name__,
202 getattr(self, self.name_column_name()),
207 class TaskBase(mixins.ModelMixin):
209 Represents the smallest unit of stateful execution in ARIA. The task state includes inputs,
210 outputs, as well as an atomic status, ensuring that the task can only be running once at any
213 The Python :attr:`function` is usually provided by an associated :class:`Plugin`. The
214 :attr:`arguments` of the function should be set according to the specific signature of the
217 Tasks may be "one shot" or may be configured to run repeatedly in the case of failure.
219 Tasks are often based on :class:`Operation`, and thus act on either a :class:`Node` or a
220 :class:`Relationship`, however this is not required.
223 __tablename__ = 'task'
225 __private_fields__ = ('node_fk',
230 START_WORKFLOW = 'start_workflow'
231 END_WORKFLOW = 'end_workflow'
232 START_SUBWROFKLOW = 'start_subworkflow'
233 END_SUBWORKFLOW = 'end_subworkflow'
235 CONDITIONAL = 'conditional'
247 RETRYING = 'retrying'
260 INFINITE_RETRIES = -1
262 # region one_to_many relationships
269 :type: [:class:`Log`]
271 return relationship.one_to_many(cls, 'log')
276 Arguments sent to the Python :attr:`function``.
278 :type: {:obj:`basestring`: :class:`Argument`}
280 return relationship.one_to_many(cls, 'argument', dict_key='name')
284 # region many_one relationships
289 Containing execution.
291 :type: :class:`Execution`
293 return relationship.many_to_one(cls, 'execution')
298 Node actor (can be ``None``).
302 return relationship.many_to_one(cls, 'node')
305 def relationship(cls):
307 Relationship actor (can be ``None``).
309 :type: :class:`Relationship`
311 return relationship.many_to_one(cls, 'relationship')
318 :type: :class:`Plugin`
320 return relationship.many_to_one(cls, 'plugin')
324 # region association proxies
328 return relationship.association_proxy('node', cls.name_column_name())
331 def relationship_name(cls):
332 return relationship.association_proxy('relationship', cls.name_column_name())
335 def execution_name(cls):
336 return relationship.association_proxy('execution', cls.name_column_name())
340 # region foreign keys
343 def execution_fk(cls):
344 return relationship.foreign_key('execution', nullable=True)
348 return relationship.foreign_key('node', nullable=True)
351 def relationship_fk(cls):
352 return relationship.foreign_key('relationship', nullable=True)
356 return relationship.foreign_key('plugin', nullable=True)
360 status = Column(Enum(*STATES, name='status'), default=PENDING, doc="""
361 Current atomic status ('pending', 'retrying', 'sent', 'started', 'success', 'failed').
363 :type: :obj:`basestring`
366 due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow(), doc="""
367 Timestamp to start the task.
369 :type: :class:`~datetime.datetime`
372 started_at = Column(DateTime, default=None, doc="""
375 :type: :class:`~datetime.datetime`
378 ended_at = Column(DateTime, default=None, doc="""
381 :type: :class:`~datetime.datetime`
384 attempts_count = Column(Integer, default=1, doc="""
385 How many attempts occurred.
387 :type: :class:`~datetime.datetime`
390 function = Column(String, doc="""
391 Full path to Python function.
393 :type: :obj:`basestring`
396 max_attempts = Column(Integer, default=1, doc="""
397 Maximum number of attempts allowed in case of task failure.
402 retry_interval = Column(Float, default=0, doc="""
403 Interval between task retry attemps (in seconds).
408 ignore_failure = Column(Boolean, default=False, doc="""
409 Set to ``True`` to ignore failures.
414 interface_name = Column(String, doc="""
415 Name of interface on node or relationship.
417 :type: :obj:`basestring`
420 operation_name = Column(String, doc="""
421 Name of operation in interface on node or relationship.
423 :type: :obj:`basestring`
426 _api_id = Column(String)
427 _executor = Column(PickleType)
428 _context_cls = Column(PickleType)
429 _stub_type = Column(Enum(*STUB_TYPES))
434 Actor of the task (node or relationship).
436 return self.node or self.relationship
438 @orm.validates('max_attempts')
439 def validate_max_attempts(self, _, value): # pylint: disable=no-self-use
441 Validates that max attempts is either -1 or a positive number.
443 if value < 1 and value != TaskBase.INFINITE_RETRIES:
444 raise ValueError('Max attempts can be either -1 (infinite) or any positive number. '
445 'Got {value}'.format(value=value))
449 def abort(message=None):
450 raise TaskAbortException(message)
453 def retry(message=None, retry_interval=None):
454 raise TaskRetryException(message, retry_interval=retry_interval)
457 def dependencies(cls):
458 return relationship.many_to_many(cls, self=True)
461 return self.status in (self.SUCCESS, self.FAILED)
463 def is_waiting(self):
465 return not self.has_ended()
467 return self.status in (self.PENDING, self.RETRYING)
470 def from_api_task(cls, api_task, executor, **kwargs):
471 instantiation_kwargs = {}
473 if hasattr(api_task.actor, 'outbound_relationships'):
474 instantiation_kwargs['node'] = api_task.actor
475 elif hasattr(api_task.actor, 'source_node'):
476 instantiation_kwargs['relationship'] = api_task.actor
478 raise RuntimeError('No operation context could be created for {actor.model_cls}'
479 .format(actor=api_task.actor))
481 instantiation_kwargs.update(
483 'name': api_task.name,
484 'status': cls.PENDING,
485 'max_attempts': api_task.max_attempts,
486 'retry_interval': api_task.retry_interval,
487 'ignore_failure': api_task.ignore_failure,
488 'execution': api_task._workflow_context.execution,
489 'interface_name': api_task.interface_name,
490 'operation_name': api_task.operation_name,
492 # Only non-stub tasks have these fields
493 'plugin': api_task.plugin,
494 'function': api_task.function,
495 'arguments': api_task.arguments,
496 '_context_cls': api_task._context_cls,
497 '_executor': executor,
501 instantiation_kwargs.update(**kwargs)
503 return cls(**instantiation_kwargs)
506 class LogBase(mixins.ModelMixin):
511 __tablename__ = 'log'
513 __private_fields__ = ('execution_fk',
516 # region many_to_one relationships
521 Containing execution.
523 :type: :class:`Execution`
525 return relationship.many_to_one(cls, 'execution')
530 Containing task (can be ``None``).
534 return relationship.many_to_one(cls, 'task')
538 # region foreign keys
541 def execution_fk(cls):
542 return relationship.foreign_key('execution')
546 return relationship.foreign_key('task', nullable=True)
550 level = Column(String, doc="""
553 :type: :obj:`basestring`
556 msg = Column(String, doc="""
559 :type: :obj:`basestring`
562 created_at = Column(DateTime, index=True, doc="""
565 :type: :class:`~datetime.datetime`
568 traceback = Column(Text, doc="""
569 Error traceback in case of failure.
571 :type: :class:`~datetime.datetime`
578 name = (self.task.actor if self.task else self.execution).name
579 return '{name}: {self.msg}'.format(name=name, self=self)
582 class PluginBase(mixins.ModelMixin):
586 Plugins are usually packaged as `wagons <https://github.com/cloudify-cosmo/wagon>`__, which
587 are archives of one or more `wheels <https://packaging.python.org/distributing/#wheels>`__.
588 Most of these fields are indeed extracted from the installed wagon's metadata.
591 __tablename__ = 'plugin'
593 # region one_to_many relationships
600 :type: [:class:`Task`]
602 return relationship.one_to_many(cls, 'task')
606 archive_name = Column(Text, nullable=False, index=True, doc="""
607 Filename (not the full path) of the wagon's archive, often with a ``.wgn`` extension.
609 :type: :obj:`basestring`
612 distribution = Column(Text, doc="""
613 Name of the operating system on which the wagon was installed (e.g. ``ubuntu``).
615 :type: :obj:`basestring`
618 distribution_release = Column(Text, doc="""
619 Release of the operating system on which the wagon was installed (e.g. ``trusty``).
621 :type: :obj:`basestring`
624 distribution_version = Column(Text, doc="""
625 Version of the operating system on which the wagon was installed (e.g. ``14.04``).
627 :type: :obj:`basestring`
630 package_name = Column(Text, nullable=False, index=True, doc="""
631 Primary Python package name used when the wagon was installed, which is one of the wheels in the
632 wagon (e.g. ``cloudify-script-plugin``).
634 :type: :obj:`basestring`
637 package_source = Column(Text, doc="""
638 Full install string for the primary Python package name used when the wagon was installed (e.g.
639 ``cloudify-script-plugin==1.2``).
641 :type: :obj:`basestring`
644 package_version = Column(Text, doc="""
645 Version for the primary Python package name used when the wagon was installed (e.g. ``1.2``).
647 :type: :obj:`basestring`
650 supported_platform = Column(Text, doc="""
651 If the wheels are *all* pure Python then this would be "any", otherwise it would be the
652 installed platform name (e.g. ``linux_x86_64``).
654 :type: :obj:`basestring`
657 supported_py_versions = Column(modeling_types.StrictList(basestring), doc="""
658 Python versions supported by all the wheels (e.g. ``["py26", "py27"]``)
660 :type: [:obj:`basestring`]
663 wheels = Column(modeling_types.StrictList(basestring), nullable=False, doc="""
664 Filenames of the wheels archived in the wagon, often with a ``.whl`` extension.
666 :type: [:obj:`basestring`]
669 uploaded_at = Column(DateTime, nullable=False, index=True, doc="""
670 Timestamp for when the wagon was installed.
672 :type: :class:`~datetime.datetime`
676 class ArgumentBase(mixins.ParameterMixin):
678 Python function argument parameter.
681 __tablename__ = 'argument'
683 # region many_to_one relationships
688 Containing task (can be ``None``);
692 return relationship.many_to_one(cls, 'task')
697 Containing operation (can be ``None``);
699 :type: :class:`Operation`
701 return relationship.many_to_one(cls, 'operation')
705 # region foreign keys
709 return relationship.foreign_key('task', nullable=True)
712 def operation_fk(cls):
713 return relationship.foreign_key('operation', nullable=True)