af0aab2c6b67c294edd9452c6eaab197df2fb0a7
[vfc/nfvo/lcm.git] / lcm / workflows / graphflow / tests / graph_flow_tests.py
1 # Copyright 2018 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import unittest
16 import mock
17 import json
18 from lcm.pub.utils import restcall
19 from lcm.workflows.graphflow.flow.flow import GraphFlow
20
21
22 config = {
23     "CreateSynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateSynVNF"},
24     "CreateAsynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateAsynVNF"},
25     "CreateASynRestVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateASynRestVNF"}
26 }
27
28
29 class test(object):
30     def execute(self, args):
31         print "test args %s" % args
32
33
34 class GraphFlowTest(unittest.TestCase):
35     def setUp(self):
36         pass
37
38     def tearDown(self):
39         pass
40
41     def test_sync_task(self):
42         deploy_graph = {
43             "ran-cu-00": ["ran-du-00"],
44             "ran-du-00": [],
45         }
46         TaskSet = {
47             'ran-cu-00': {
48                 "type": "CreateSynVNF",
49                 "input": {
50                     "nsInstanceId": 1,
51                     "vnfId": 1
52                 },
53                 "timeOut": 10
54             },
55             'ran-du-00': {
56                 "type": "CreateSynVNF",
57                 "input": {
58                     "nsInstanceId": 1,
59                     "vnfId": 1
60                 },
61                 "timeOut": 10
62             }
63         }
64         gf = GraphFlow(deploy_graph, TaskSet, config)
65         gf.start()
66         gf.join()
67         gf.task_manager.wait_tasks_done(gf.sort_nodes)
68         task_set = gf.task_manager.get_all_task()
69         for task in task_set.itervalues():
70             self.assertEqual(task.FINISHED, task.status)
71
72     def test_async_task(self):
73         deploy_graph = {
74             "ran-cu-01": ["ran-du-01"],
75             "ran-du-01": [],
76         }
77         TaskSet = {
78             'ran-cu-01': {
79                 "type": "CreateAsynVNF",
80                 "input": {
81                     "nsInstanceId": 1,
82                     "vnfId": 1
83                 },
84                 "timeOut": 10
85             },
86             'ran-du-01': {
87                 "type": "CreateAsynVNF",
88                 "input": {
89                     "nsInstanceId": 1,
90                     "vnfId": 1
91                 },
92                 "timeOut": 10
93             }
94         }
95         gf = GraphFlow(deploy_graph, TaskSet, config)
96         gf.start()
97         gf.join()
98         gf.task_manager.wait_tasks_done(gf.sort_nodes)
99         task_set = gf.task_manager.get_all_task()
100         for task in task_set.itervalues():
101             self.assertEqual(task.FINISHED, task.status)
102
103     @mock.patch.object(restcall, 'call_req')
104     def test_async_rest_task(self, mock_call_req):
105         mock_call_req.return_value = [0, json.JSONEncoder().encode({
106             'jobId': "1",
107             "responseDescriptor": {"progress": 100}
108         }), '200']
109
110         deploy_graph = {
111             "ran-cu-02": ["ran-du-02"],
112             "ran-du-02": [],
113         }
114         TaskSet = {
115             'ran-cu-02': {
116                 "type": "CreateASynRestVNF",
117                 "input": {
118                     "url": "/test/",
119                     "method": "POST",
120                     "content": {}
121                 },
122                 "timeOut": 10
123             },
124             'ran-du-02': {
125                 "type": "CreateASynRestVNF",
126                 "input": {
127                     "url": "/test/",
128                     "method": "POST",
129                     "content": {}
130                 },
131                 "timeOut": 10
132             }
133         }
134         gf = GraphFlow(deploy_graph, TaskSet, config)
135         gf.start()
136         gf.join()
137         gf.task_manager.wait_tasks_done(gf.sort_nodes)
138         task_set = gf.task_manager.get_all_task()
139         for task in task_set.itervalues():
140             self.assertEqual(task.FINISHED, task.status)