1 # ============LICENSE_START==========================================
2 # ===================================================================
3 # Copyright (c) 2018-2020 AT&T
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END============================================
24 from cloudify.test_utils import workflow_test
25 from cloudify.mocks import MockNodeInstanceContext
26 from cloudify.mocks import MockCloudifyContext
27 from cloudify.state import current_ctx
28 from cloudify import ctx
31 class TestPlugin(unittest.TestCase):
33 @workflow_test(path.join('blueprint', 'blueprint.yaml'),
34 resources_to_copy=[(path.join('blueprint', 'plugin',
37 @mock.patch('plugin.tasks.os.remove')
38 @mock.patch('plugin.tasks.execute_command')
39 def test_stop(self, cfy_local, mock_execute_command, mock_os_remove):
40 # execute install workflow
45 with mock.patch('plugin.tasks.shutil.rmtree'):
46 cfy_local.execute('uninstall', task_retries=0)
48 # extract single node instance
49 instance = cfy_local.storage.get_node_instances()[0]
51 mock_execute_command.assert_called_with('helm delete --purge onap-test_node --host 1.1.1.1:8888 ')
53 @workflow_test(path.join('blueprint', 'blueprint.yaml'),
54 resources_to_copy=[(path.join('blueprint', 'plugin',
57 @mock.patch('plugin.tasks.execute_command')
58 def test_start(self, cfy_local, mock_execute_command):
59 # execute install workflow
64 with mock.patch('plugin.tasks.config'):
65 with mock.patch('plugin.tasks.get_current_helm_value'):
66 with mock.patch('plugin.tasks.get_helm_history'):
67 cfy_local.execute('install', task_retries=0)
69 # extract single node instance
70 instance = cfy_local.storage.get_node_instances()[0]
72 mock_execute_command.assert_called_with('helm install local/test_node-2.0.0.tgz --name onap-test_node --namespace onap --host 1.1.1.1:8888 ')
74 @workflow_test(path.join('blueprint', 'blueprint.yaml'),
75 resources_to_copy=[(path.join('blueprint', 'plugin',
78 @mock.patch('plugin.tasks.execute_command')
79 def test_config(self, cfy_local, mock_execute_command):
80 # execute install workflow
85 with mock.patch('plugin.tasks.start'):
86 cfy_local.execute('install', task_retries=0)
88 # extract single node instance
89 instance = cfy_local.storage.get_node_instances()[0]
91 mock_execute_command.assert_called_with('helm init --client-only --stable-repo-url http://0.0.0.0/stable')
93 @workflow_test(path.join('blueprint', 'blueprint.yaml'),
94 resources_to_copy=[(path.join('blueprint', 'plugin',
97 def test_rollback(self, cfy_local):
98 # execute install workflow
103 node_instance_id = 'node_instance_id'
106 cfy_local.execute('rollback', task_retries=0,
107 parameters={'node_instance_id': node_instance_id, 'revision': revision})
108 self.fail('Expected exception due to operation not exist')
109 except Exception as e:
110 self.assertTrue('operation not available')
112 @workflow_test(path.join('blueprint', 'blueprint.yaml'),
113 resources_to_copy=[(path.join('blueprint', 'plugin',
116 def test_upgrade(self, cfy_local):
117 # execute install workflow
122 node_instance_id = 'node_instance_id'
124 config_url = 'http://test:test@11.22.33.44:80/stable'
125 config_format = 'json'
126 chartVersion = '2.0.0'
129 repo_user_passwd = ''
131 cfy_local.execute('upgrade', task_retries=0,
132 parameters={'node_instance_id': node_instance_id, 'config': config_json,
133 'config_url': config_url, 'config_format': config_format,
134 'chart_version': chartVersion, 'chart_repo_url': chartRepo,
135 'repo_user': repo_user, 'repo_user_password': repo_user_passwd})
136 self.fail('Expected exception due to operation not exist')
137 except Exception as e:
138 self.assertTrue('operation not available')
140 @mock.patch('plugin.tasks.execute_command')
141 def test_op_rollback(self, mock_execute_command):
142 # test operation rollback
145 :rollback operation test:
148 'component_name': 'test_node',
150 'tiller_port': '8888',
151 'tiller_ip': '1.1.1.1',
152 'tls_enable': 'false'
154 args = {'revision': '1'}
155 mock_ctx = MockCloudifyContext(node_id='test_node_id', node_name='test_node_name',
158 current_ctx.set(mock_ctx)
159 with mock.patch('plugin.tasks.get_current_helm_value'):
160 with mock.patch('plugin.tasks.get_helm_history'):
161 plugin.tasks.rollback(**args)
165 @mock.patch('plugin.tasks.execute_command')
166 def test_op_upgrade(self, mock_execute_command):
167 # test operation upgrade
170 :upgrade operation test:
173 'component_name': 'test_node',
175 'tiller_port': '8888',
176 'tiller_ip': '1.1.1.1',
177 'tls_enable': 'false',
180 args = {'revision': '1', 'config': '', 'chart_repo': 'repo', 'chart_version': '2',
181 'config_set': 'config_set', 'config_json': '', 'config_url': '',
182 'config_format': 'format', 'repo_user': '', 'repo_user_passwd': ''}
183 mock_ctx = MockCloudifyContext(node_id='test_node_id', node_name='test_node_name',
186 current_ctx.set(mock_ctx)
187 with mock.patch('plugin.tasks.get_current_helm_value'):
188 with mock.patch('plugin.tasks.get_helm_history'):
189 with mock.patch('plugin.tasks.gen_config_str'):
190 plugin.tasks.upgrade(**args)