Merge "vFW and vDNS support added to azure-plugin"
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / executor / test_process_executor_concurrent_modifications.py
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
new file mode 100644 (file)
index 0000000..86a2edf
--- /dev/null
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+import fasteners
+import pytest
+
+from aria.orchestrator import events
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests.orchestrator.context import execute as execute_workflow
+from tests.orchestrator.workflows.helpers import events_collector
+from tests import mock
+from tests import storage
+from tests import helpers
+
+
+@pytest.fixture
+def dataholder(tmpdir):
+    dataholder_path = str(tmpdir.join('dataholder'))
+    holder = helpers.FilesystemDataHolder(dataholder_path)
+    return holder
+
+
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder):
+    _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False)
+
+
+@operation
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path):
+    _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
+
+
+def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder):
+    _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True)
+
+
+@operation
+def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path):
+    first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
+    if not first:
+        raise RuntimeError('MESSAGE')
+
+
+def _test(context, executor, lock_files, func, dataholder, expected_failure):
+    def _node(ctx):
+        return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+    interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+    key = 'key'
+    first_value = 'value1'
+    second_value = 'value2'
+    arguments = {
+        'lock_files': lock_files,
+        'key': key,
+        'first_value': first_value,
+        'second_value': second_value,
+        'holder_path': dataholder.path
+    }
+
+    node = _node(context)
+    interface = mock.models.create_interface(
+        node.service,
+        interface_name,
+        operation_name,
+        operation_kwargs=dict(function='{0}.{1}'.format(__name__, func.__name__),
+                              arguments=arguments)
+    )
+    node.interfaces[interface.name] = interface
+    context.model.node.update(node)
+
+    @workflow
+    def mock_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask(
+                node,
+                interface_name=interface_name,
+                operation_name=operation_name,
+                arguments=arguments),
+            api.task.OperationTask(
+                node,
+                interface_name=interface_name,
+                operation_name=operation_name,
+                arguments=arguments)
+        )
+
+    signal = events.on_failure_task_signal
+    with events_collector(signal) as collected:
+        try:
+            execute_workflow(mock_workflow, context, executor)
+        except ExecutorException:
+            pass
+
+    props = _node(context).attributes
+    assert dataholder['invocations'] == 2
+    assert props[key].value == dataholder[key]
+
+    exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
+    if expected_failure:
+        assert exceptions
+
+
+@pytest.fixture
+def executor():
+    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+    try:
+        yield result
+    finally:
+        result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+    result = mock.context.simple(str(tmpdir))
+    yield result
+    storage.release_sqlite_storage(result.model)
+
+
+@pytest.fixture
+def lock_files(tmpdir):
+    return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
+
+
+def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path):
+    holder = helpers.FilesystemDataHolder(holder_path)
+    locker1 = fasteners.InterProcessLock(lock_files[0])
+    locker2 = fasteners.InterProcessLock(lock_files[1])
+
+    first = locker1.acquire(blocking=False)
+
+    if first:
+        # Give chance for both processes to acquire locks
+        while locker2.acquire(blocking=False):
+            locker2.release()
+            time.sleep(0.1)
+    else:
+        locker2.acquire()
+
+    node.attributes[key] = first_value if first else second_value
+    holder['key'] = first_value if first else second_value
+    holder.setdefault('invocations', 0)
+    holder['invocations'] += 1
+
+    if first:
+        locker1.release()
+    else:
+        with locker1:
+            locker2.release()
+
+    return first