vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / execution_plugin / test_local.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import json
17 import os
18
19 import pytest
20
21 from aria import workflow
22 from aria.orchestrator import events
23 from aria.orchestrator.workflows import api
24 from aria.orchestrator.workflows.exceptions import ExecutorException
25 from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
26 from aria.orchestrator.execution_plugin import operations
27 from aria.orchestrator.execution_plugin.exceptions import ProcessException
28 from aria.orchestrator.execution_plugin import local
29 from aria.orchestrator.execution_plugin import constants
30 from aria.orchestrator.workflows.executor import process
31 from aria.orchestrator.workflows.core import engine, graph_compiler
32
33 from tests import mock
34 from tests import storage
35 from tests.orchestrator.workflows.helpers import events_collector
36
37 IS_WINDOWS = os.name == 'nt'
38
39
40 class TestLocalRunScript(object):
41
42     def test_script_path_parameter(self, executor, workflow_context, tmpdir):
43         script_path = self._create_script(
44             tmpdir,
45             linux_script='''#! /bin/bash -e
46             ctx node attributes map key = value
47             ''',
48             windows_script='''
49             ctx node attributes map key = value
50         ''')
51         props = self._run(
52             executor, workflow_context,
53             script_path=script_path)
54         assert props['map'].value['key'] == 'value'
55
56     def test_process_env(self, executor, workflow_context, tmpdir):
57         script_path = self._create_script(
58             tmpdir,
59             linux_script='''#! /bin/bash -e
60             ctx node attributes map key1 = "$key1"
61             ctx node attributes map key2 = "$key2"
62             ''',
63             windows_script='''
64             ctx node attributes map key1 = %key1%
65             ctx node attributes map key2 = %key2%
66         ''')
67         props = self._run(
68             executor, workflow_context,
69             script_path=script_path,
70             process={
71                 'env': {
72                     'key1': 'value1',
73                     'key2': 'value2'
74                 }
75             })
76         p_map = props['map'].value
77         assert p_map['key1'] == 'value1'
78         assert p_map['key2'] == 'value2'
79
80     def test_process_cwd(self, executor, workflow_context, tmpdir):
81         script_path = self._create_script(
82             tmpdir,
83             linux_script='''#! /bin/bash -e
84             ctx node attributes map cwd = "$PWD"
85             ''',
86             windows_script='''
87             ctx node attributes map cwd = %CD%
88             ''')
89         tmpdir = str(tmpdir)
90         props = self._run(
91             executor, workflow_context,
92             script_path=script_path,
93             process={
94                 'cwd': tmpdir
95             })
96         p_map = props['map'].value
97         assert p_map['cwd'] == tmpdir
98
99     def test_process_command_prefix(self, executor, workflow_context, tmpdir):
100         use_ctx = 'ctx node attributes map key = value'
101         python_script = ['import subprocess',
102                          'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)]
103         python_script = '\n'.join(python_script)
104         script_path = self._create_script(
105             tmpdir,
106             linux_script=python_script,
107             windows_script=python_script,
108             windows_suffix='',
109             linux_suffix='')
110         props = self._run(
111             executor, workflow_context,
112             script_path=script_path,
113             process={
114                 'env': {'TEST_KEY': 'value'},
115                 'command_prefix': 'python'
116             })
117         p_map = props['map'].value
118         assert p_map['key'] == 'value'
119
120     def test_process_args(self, executor, workflow_context, tmpdir):
121         script_path = self._create_script(
122             tmpdir,
123             linux_script='''#! /bin/bash -e
124             ctx node attributes map arg1 = "$1"
125             ctx node attributes map arg2 = "$2"
126             ''',
127             windows_script='''
128             ctx node attributes map arg1 = %1
129             ctx node attributes map arg2 = %2
130             ''')
131         props = self._run(
132             executor, workflow_context,
133             script_path=script_path,
134             process={
135                 'args': ['"arg with spaces"', 'arg2']
136             })
137         assert props['map'].value['arg1'] == 'arg with spaces'
138         assert props['map'].value['arg2'] == 'arg2'
139
140     def test_no_script_path(self, executor, workflow_context):
141         exception = self._run_and_get_task_exception(
142             executor, workflow_context,
143             script_path=None)
144         assert isinstance(exception, TaskAbortException)
145         assert 'script_path' in exception.message
146
147     def test_script_error(self, executor, workflow_context, tmpdir):
148         script_path = self._create_script(
149             tmpdir,
150             linux_script='''#! /bin/bash -e
151             echo 123123
152             command_that_does_not_exist [ ]
153             ''',
154             windows_script='''
155             @echo off
156             echo 123123
157             command_that_does_not_exist [ ]
158             ''')
159         exception = self._run_and_get_task_exception(
160             executor, workflow_context,
161             script_path=script_path)
162         assert isinstance(exception, ProcessException)
163         assert os.path.basename(script_path) in exception.command
164         assert exception.exit_code == 1 if IS_WINDOWS else 127
165         assert exception.stdout.strip() == '123123'
166         assert 'command_that_does_not_exist' in exception.stderr
167
168     def test_script_error_from_bad_ctx_request(self, executor, workflow_context, tmpdir):
169         script_path = self._create_script(
170             tmpdir,
171             linux_script='''#! /bin/bash -e
172             ctx property_that_does_not_exist
173             ''',
174             windows_script='''
175             ctx property_that_does_not_exist
176             ''')
177         exception = self._run_and_get_task_exception(
178             executor, workflow_context,
179             script_path=script_path)
180         assert isinstance(exception, ProcessException)
181         assert os.path.basename(script_path) in exception.command
182         assert exception.exit_code == 1
183         assert 'RequestError' in exception.stderr
184         assert 'property_that_does_not_exist' in exception.stderr
185
186     def test_python_script(self, executor, workflow_context, tmpdir):
187         script = '''
188 from aria.orchestrator.execution_plugin import ctx, inputs
189 if __name__ == '__main__':
190     ctx.node.attributes['key'] = inputs['key']
191 '''
192         suffix = '.py'
193         script_path = self._create_script(
194             tmpdir,
195             linux_script=script,
196             windows_script=script,
197             linux_suffix=suffix,
198             windows_suffix=suffix)
199         props = self._run(
200             executor, workflow_context,
201             script_path=script_path,
202             arguments={'key': 'value'})
203         assert props['key'].value == 'value'
204
205     @pytest.mark.parametrize(
206         'value', ['string-value', [1, 2, 3], 999, 3.14, False,
207                   {'complex1': {'complex2': {'key': 'value'}, 'list': [1, 2, 3]}}])
208     def test_inputs_as_environment_variables(self, executor, workflow_context, tmpdir, value):
209         script_path = self._create_script(
210             tmpdir,
211             linux_script='''#! /bin/bash -e
212             ctx node attributes key = "${input_as_env_var}"
213             ''',
214             windows_script='''
215             ctx node attributes key = "%input_as_env_var%"
216         ''')
217         props = self._run(
218             executor, workflow_context,
219             script_path=script_path,
220             env_var=value)
221         value = props['key'].value
222         expected = value if isinstance(value, basestring) else json.loads(value)
223         assert expected == value
224
225     @pytest.mark.parametrize('value', ['override', {'key': 'value'}])
226     def test_explicit_env_variables_inputs_override(
227             self, executor, workflow_context, tmpdir, value):
228         script_path = self._create_script(
229             tmpdir,
230             linux_script='''#! /bin/bash -e
231             ctx node attributes key = "${input_as_env_var}"
232             ''',
233             windows_script='''
234             ctx node attributes key = "%input_as_env_var%"
235         ''')
236
237         props = self._run(
238             executor, workflow_context,
239             script_path=script_path,
240             env_var='test-value',
241             process={
242                 'env': {
243                     'input_as_env_var': value
244                 }
245             })
246         value = props['key'].value
247         expected = value if isinstance(value, basestring) else json.loads(value)
248         assert expected == value
249
250     def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir):
251         script_path = self._create_script(
252             tmpdir,
253             linux_script='''#! /bin/bash -e
254             ctx node attributes nonexistent
255             ''',
256             windows_script='''
257             ctx node attributes nonexistent
258         ''')
259         exception = self._run_and_get_task_exception(
260             executor, workflow_context,
261             script_path=script_path)
262         assert isinstance(exception, ProcessException)
263         assert os.path.basename(script_path) in exception.command
264         assert 'RequestError' in exception.stderr
265         assert 'nonexistent' in exception.stderr
266
267     def test_get_nonexistent_runtime_property_json(self, executor, workflow_context, tmpdir):
268         script_path = self._create_script(
269             tmpdir,
270             linux_script='''#! /bin/bash -e
271             ctx -j node attributes nonexistent
272             ''',
273             windows_script='''
274             ctx -j node attributes nonexistent
275             ''')
276         exception = self._run_and_get_task_exception(
277             executor, workflow_context,
278             script_path=script_path)
279         assert isinstance(exception, ProcessException)
280         assert os.path.basename(script_path) in exception.command
281         assert 'RequestError' in exception.stderr
282         assert 'nonexistent' in exception.stderr
283
284     def test_abort(self, executor, workflow_context, tmpdir):
285         script_path = self._create_script(
286             tmpdir,
287             linux_script='''#! /bin/bash -e
288             ctx task abort [ abort-message ]
289             ''',
290             windows_script='''
291             ctx task abort [ abort-message ]
292             ''')
293         exception = self._run_and_get_task_exception(
294             executor, workflow_context,
295             script_path=script_path)
296         assert isinstance(exception, TaskAbortException)
297         assert exception.message == 'abort-message'
298
299     def test_retry(self, executor, workflow_context, tmpdir):
300         script_path = self._create_script(
301             tmpdir,
302             linux_script='''#! /bin/bash -e
303             ctx task retry [ retry-message ]
304             ''',
305             windows_script='''
306             ctx task retry [ retry-message ]
307             ''')
308         exception = self._run_and_get_task_exception(
309             executor, workflow_context,
310             script_path=script_path)
311         assert isinstance(exception, TaskRetryException)
312         assert exception.message == 'retry-message'
313
314     def test_retry_with_interval(self, executor, workflow_context, tmpdir):
315         script_path = self._create_script(
316             tmpdir,
317             linux_script='''#! /bin/bash -e
318             ctx task retry [ retry-message @100 ]
319             ''',
320             windows_script='''
321             ctx task retry [ retry-message @100 ]
322             ''')
323         exception = self._run_and_get_task_exception(
324             executor, workflow_context,
325             script_path=script_path)
326         assert isinstance(exception, TaskRetryException)
327         assert exception.message == 'retry-message'
328         assert exception.retry_interval == 100
329
330     def test_crash_abort_after_retry(self, executor, workflow_context, tmpdir):
331         script_path = self._create_script(
332             tmpdir,
333             linux_script='''#! /bin/bash
334             ctx task retry [ retry-message ]
335             ctx task abort [ should-raise-a-runtime-error ]
336             ''',
337             windows_script='''
338             ctx task retry [ retry-message ]
339             ctx task abort [ should-raise-a-runtime-error ]
340         ''')
341         exception = self._run_and_get_task_exception(
342             executor, workflow_context,
343             script_path=script_path)
344         assert isinstance(exception, TaskAbortException)
345         assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
346
347     def test_crash_retry_after_abort(self, executor, workflow_context, tmpdir):
348         script_path = self._create_script(
349             tmpdir,
350             linux_script='''#! /bin/bash
351             ctx task abort [ abort-message ]
352             ctx task retry [ should-raise-a-runtime-error ]
353             ''',
354             windows_script='''
355             ctx task abort [ abort-message ]
356             ctx task retry [ should-raise-a-runtime-error ]
357             ''')
358         exception = self._run_and_get_task_exception(
359             executor, workflow_context,
360             script_path=script_path)
361         assert isinstance(exception, TaskAbortException)
362         assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
363
364     def test_crash_abort_after_abort(self, executor, workflow_context, tmpdir):
365         script_path = self._create_script(
366             tmpdir,
367             linux_script='''#! /bin/bash
368             ctx task abort [ abort-message ]
369             ctx task abort [ should-raise-a-runtime-error ]
370             ''',
371             windows_script='''
372             ctx task abort [ abort-message ]
373             ctx task abort [ should-raise-a-runtime-error ]
374             ''')
375         exception = self._run_and_get_task_exception(
376             executor, workflow_context,
377             script_path=script_path)
378         assert isinstance(exception, TaskAbortException)
379         assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
380
381     def test_crash_retry_after_retry(self, executor, workflow_context, tmpdir):
382         script_path = self._create_script(
383             tmpdir,
384             linux_script='''#! /bin/bash
385             ctx task retry [ retry-message ]
386             ctx task retry [ should-raise-a-runtime-error ]
387             ''',
388             windows_script='''
389             ctx task retry [ retry-message ]
390             ctx task retry [ should-raise-a-runtime-error ]
391             ''')
392         exception = self._run_and_get_task_exception(
393             executor, workflow_context,
394             script_path=script_path)
395         assert isinstance(exception, TaskAbortException)
396         assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
397
398     def test_retry_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir):
399         log_path = tmpdir.join('temp.log')
400         message = 'message'
401         script_path = self._create_script(
402             tmpdir,
403             linux_script='''#! /bin/bash -e
404             ctx task retry [ "{0}" ] 2> {1}
405             echo should-not-run > {1}
406             '''.format(message, log_path),
407             windows_script='''
408             ctx task retry [ "{0}" ] 2> {1}
409             if %errorlevel% neq 0 exit /b %errorlevel%
410             echo should-not-run > {1}
411             '''.format(message, log_path))
412         with pytest.raises(ExecutorException):
413             self._run(
414                 executor, workflow_context,
415                 script_path=script_path)
416         assert log_path.read().strip() == message
417
418     def test_abort_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir):
419         log_path = tmpdir.join('temp.log')
420         message = 'message'
421         script_path = self._create_script(
422             tmpdir,
423             linux_script='''#! /bin/bash -e
424             ctx task abort [ "{0}" ] 2> {1}
425             echo should-not-run > {1}
426             '''.format(message, log_path),
427             windows_script='''
428             ctx task abort [ "{0}" ] 2> {1}
429             if %errorlevel% neq 0 exit /b %errorlevel%
430             echo should-not-run > {1}
431             '''.format(message, log_path))
432         with pytest.raises(ExecutorException):
433             self._run(
434                 executor, workflow_context,
435                 script_path=script_path)
436         assert log_path.read().strip() == message
437
438     def _create_script(self,
439                        tmpdir,
440                        linux_script,
441                        windows_script,
442                        windows_suffix='.bat',
443                        linux_suffix=''):
444         suffix = windows_suffix if IS_WINDOWS else linux_suffix
445         script = windows_script if IS_WINDOWS else linux_script
446         script_path = tmpdir.join('script{0}'.format(suffix))
447         script_path.write(script)
448         return str(script_path)
449
450     def _run_and_get_task_exception(self, *args, **kwargs):
451         signal = events.on_failure_task_signal
452         with events_collector(signal) as collected:
453             with pytest.raises(ExecutorException):
454                 self._run(*args, **kwargs)
455         return collected[signal][0]['kwargs']['exception']
456
457     def _run(self,
458              executor,
459              workflow_context,
460              script_path,
461              process=None,
462              env_var='value',
463              arguments=None):
464         local_script_path = script_path
465         script_path = os.path.basename(local_script_path) if local_script_path else ''
466         arguments = arguments or {}
467         process = process or {}
468         if script_path:
469             workflow_context.resource.service.upload(
470                 entry_id=str(workflow_context.service.id),
471                 source=local_script_path,
472                 path=script_path)
473
474         arguments.update({
475             'script_path': script_path,
476             'process': process,
477             'input_as_env_var': env_var
478         })
479
480         node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
481         interface = mock.models.create_interface(
482             node.service,
483             'test',
484             'op',
485             operation_kwargs=dict(
486                 function='{0}.{1}'.format(
487                     operations.__name__,
488                     operations.run_script_locally.__name__),
489                 arguments=arguments)
490         )
491         node.interfaces[interface.name] = interface
492         workflow_context.model.node.update(node)
493
494         @workflow
495         def mock_workflow(ctx, graph):
496             graph.add_tasks(api.task.OperationTask(
497                 node,
498                 interface_name='test',
499                 operation_name='op',
500                 arguments=arguments))
501             return graph
502         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: disable=no-value-for-parameter
503         graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
504         eng = engine.Engine({executor.__class__: executor})
505         eng.execute(workflow_context)
506         return workflow_context.model.node.get_by_name(
507             mock.models.DEPENDENCY_NODE_NAME).attributes
508
509     @pytest.fixture
510     def executor(self):
511         result = process.ProcessExecutor()
512         try:
513             yield result
514         finally:
515             result.close()
516
517     @pytest.fixture
518     def workflow_context(self, tmpdir):
519         workflow_context = mock.context.simple(str(tmpdir), inmemory=False)
520         workflow_context.states = []
521         workflow_context.exception = None
522         yield workflow_context
523         storage.release_sqlite_storage(workflow_context.model)
524
525
526 class BaseTestConfiguration(object):
527
528     @pytest.fixture(autouse=True)
529     def mock_execute(self, mocker):
530         def eval_func(**_):
531             self.called = 'eval'
532
533         def execute_func(process, **_):
534             self.process = process
535             self.called = 'execute'
536         self.process = {}
537         self.called = None
538         mocker.patch.object(local, '_execute_func', execute_func)
539         mocker.patch.object(local, '_eval_script_func', eval_func)
540
541     class Ctx(object):
542         @staticmethod
543         def download_resource(destination, *args, **kwargs):
544             return destination
545
546     def _run(self, script_path, process=None):
547         local.run_script(
548             script_path=script_path,
549             process=process,
550             ctx=self.Ctx)
551
552
553 class TestPowerShellConfiguration(BaseTestConfiguration):
554
555     def test_implicit_powershell_call_with_ps1_extension(self):
556         self._run(script_path='script_path.ps1')
557         assert self.process['command_prefix'] == 'powershell'
558
559     def test_command_prefix_is_overridden_for_ps1_extension(self):
560         self._run(script_path='script_path.ps1',
561                   process={'command_prefix': 'bash'})
562         assert self.process['command_prefix'] == 'bash'
563
564     def test_explicit_powershell_call(self):
565         self._run(script_path='script_path.ps1',
566                   process={'command_prefix': 'powershell'})
567         assert self.process['command_prefix'] == 'powershell'
568
569
570 class TestEvalPythonConfiguration(BaseTestConfiguration):
571
572     def test_explicit_eval_without_py_extension(self):
573         self._run(script_path='script_path',
574                   process={'eval_python': True})
575         assert self.called == 'eval'
576
577     def test_explicit_eval_with_py_extension(self):
578         self._run(script_path='script_path.py',
579                   process={'eval_python': True})
580         assert self.called == 'eval'
581
582     def test_implicit_eval(self):
583         self._run(script_path='script_path.py')
584         assert self.called == 'eval'
585
586     def test_explicit_execute_without_py_extension(self):
587         self._run(script_path='script_path',
588                   process={'eval_python': False})
589         assert self.called == 'execute'
590
591     def test_explicit_execute_with_py_extension(self):
592         self._run(script_path='script_path.py',
593                   process={'eval_python': False})
594         assert self.called == 'execute'
595
596     def test_implicit_execute(self):
597         self._run(script_path='script_path')
598         assert self.called == 'execute'