1 # Copyright 2018 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from lcm.pub.utils import restcall
19 from lcm.workflows.graphflow.flow.flow import GraphFlow
23 "CreateSynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateSynVNF"},
24 "CreateAsynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateAsynVNF"},
25 "CreateASynRestVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateASynRestVNF"}
30 def execute(self, args):
31 print "test args %s" % args
34 class GraphFlowTest(unittest.TestCase):
41 def test_sync_task(self):
43 "ran-cu-00": ["ran-du-00"],
48 "type": "CreateSynVNF",
56 "type": "CreateSynVNF",
64 gf = GraphFlow(deploy_graph, TaskSet, config)
67 gf.task_manager.wait_tasks_done(gf.sort_nodes)
68 task_set = gf.task_manager.get_all_task()
69 for task in task_set.itervalues():
70 self.assertEqual(task.FINISHED, task.status)
72 def test_async_task(self):
74 "ran-cu-01": ["ran-du-01"],
79 "type": "CreateAsynVNF",
87 "type": "CreateAsynVNF",
95 gf = GraphFlow(deploy_graph, TaskSet, config)
98 gf.task_manager.wait_tasks_done(gf.sort_nodes)
99 task_set = gf.task_manager.get_all_task()
100 for task in task_set.itervalues():
101 self.assertEqual(task.FINISHED, task.status)
103 @mock.patch.object(restcall, 'call_req')
104 def test_async_rest_task(self, mock_call_req):
105 mock_call_req.return_value = [0, json.JSONEncoder().encode({
107 "responseDescriptor": {"progress": 100}
111 "ran-cu-02": ["ran-du-02"],
116 "type": "CreateASynRestVNF",
125 "type": "CreateASynRestVNF",
134 gf = GraphFlow(deploy_graph, TaskSet, config)
137 gf.task_manager.wait_tasks_done(gf.sort_nodes)
138 task_set = gf.task_manager.get_all_task()
139 for task in task_set.itervalues():
140 self.assertEqual(task.FINISHED, task.status)