# Copyright 2018 ZTE Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import unittest import mock import json from lcm.pub.utils import restcall from lcm.workflows.graphflow.flow.flow import GraphFlow config = { "CreateSynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateSynVNF"}, "CreateAsynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateAsynVNF"}, "CreateASynRestVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateASynRestVNF"} } class test(object): def execute(self, args): print "test args %s" % args class GraphFlowTest(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_sync_task(self): deploy_graph = { "ran-cu-00": ["ran-du-00"], "ran-du-00": [], } TaskSet = { 'ran-cu-00': { "type": "CreateSynVNF", "input": { "nsInstanceId": 1, "vnfId": 1 }, "timeOut": 10 }, 'ran-du-00': { "type": "CreateSynVNF", "input": { "nsInstanceId": 1, "vnfId": 1 }, "timeOut": 10 } } gf = GraphFlow(deploy_graph, TaskSet, config) gf.start() gf.join() gf.task_manager.wait_tasks_done(gf.sort_nodes) task_set = gf.task_manager.get_all_task() for task in task_set.itervalues(): self.assertEqual(task.FINISHED, task.status) def test_async_task(self): deploy_graph = { "ran-cu-01": ["ran-du-01"], "ran-du-01": [], } TaskSet = { 'ran-cu-01': { "type": "CreateAsynVNF", "input": { "nsInstanceId": 1, "vnfId": 1 }, "timeOut": 10 }, 'ran-du-01': { "type": "CreateAsynVNF", "input": { "nsInstanceId": 1, "vnfId": 1 }, "timeOut": 10 } } gf = GraphFlow(deploy_graph, TaskSet, config) gf.start() gf.join() gf.task_manager.wait_tasks_done(gf.sort_nodes) task_set = gf.task_manager.get_all_task() for task in task_set.itervalues(): self.assertEqual(task.FINISHED, task.status) @mock.patch.object(restcall, 'call_req') def test_async_rest_task(self, mock_call_req): mock_call_req.return_value = [0, json.JSONEncoder().encode({ 'jobId': "1", "responseDescriptor": {"progress": 100} }), '200'] deploy_graph = { "ran-cu-02": ["ran-du-02"], "ran-du-02": [], } TaskSet = { 'ran-cu-02': { "type": "CreateASynRestVNF", "input": { "url": "/test/", "method": "POST", "content": {} }, "timeOut": 10 }, 'ran-du-02': { "type": "CreateASynRestVNF", "input": { "url": "/test/", "method": "POST", "content": {} }, "timeOut": 10 } } gf = GraphFlow(deploy_graph, TaskSet, config) gf.start() gf.join() gf.task_manager.wait_tasks_done(gf.sort_nodes) task_set = gf.task_manager.get_all_task() for task in task_set.itervalues(): self.assertEqual(task.FINISHED, task.status)