X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=azure%2Faria%2Faria-extension-cloudify%2Fsrc%2Faria%2Ftests%2Forchestrator%2Fworkflows%2Fbuiltin%2Ftest_execute_operation.py;fp=azure%2Faria%2Faria-extension-cloudify%2Fsrc%2Faria%2Ftests%2Forchestrator%2Fworkflows%2Fbuiltin%2Ftest_execute_operation.py;h=8713e3c575c5f53f5e65ada0f64a09a561e0bf3c;hb=7409dfb144cf2a06210400134d822a1393462b1f;hp=0000000000000000000000000000000000000000;hpb=9e65649dfff8f00dc0a0ef6b10d020ae0e2255ba;p=multicloud%2Fazure.git diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py new file mode 100644 index 0000000..8713e3c --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task +from aria.orchestrator.workflows.builtin.execute_operation import execute_operation + +from tests import mock, storage + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir), inmemory=False) + yield context + storage.release_sqlite_storage(context.model) + + +def test_execute_operation(ctx): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + interface = mock.models.create_interface( + ctx.service, + interface_name, + operation_name, + operation_kwargs=dict(function='test') + ) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + execute_tasks = list( + task.WorkflowTask( + execute_operation, + ctx=ctx, + interface_name=interface_name, + operation_name=operation_name, + operation_kwargs={}, + allow_kwargs_override=False, + run_by_dependency_order=False, + type_names=[], + node_template_ids=[], + node_ids=[node.id] + ).topological_order() + ) + + assert len(execute_tasks) == 1 + assert getattr(execute_tasks[0].actor, '_wrapped', execute_tasks[0].actor) == node + assert execute_tasks[0].operation_name == operation_name + assert execute_tasks[0].interface_name == interface_name + + +# TODO: add more scenarios