add graph flow test 69/70469/2
authormaopengzhang <zhang.maopeng1@zte.com.cn>
Mon, 15 Oct 2018 11:53:17 +0000 (19:53 +0800)
committermaopengzhang <zhang.maopeng1@zte.com.cn>
Tue, 16 Oct 2018 00:53:23 +0000 (08:53 +0800)
add graph flow test to support NS

Change-Id: Iddf31485000e40b87b3f444ae3d4c84b8d238c9c
Issue-ID: VFC-1041
Signed-off-by: maopengzhang <zhang.maopeng1@zte.com.cn>
lcm/workflows/graphflow/__init__.py
lcm/workflows/graphflow/tests/__init__.py [new file with mode: 0644]
lcm/workflows/graphflow/tests/graph_flow_tests.py [new file with mode: 0644]
lcm/workflows/graphflow/tests/graph_tests.py [new file with mode: 0644]
lcm/workflows/graphflow/tests/task_tests.py [new file with mode: 0644]

index 8e6d0ad..694d82b 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+
 TASK_STAUS = (STARTED, PROCESSING, FINISHED, ERROR) = ("started", "processing", "finished", "error")
 TIMEOUT_DEFAULT = 10
-
-# from lcm.workflows.graphflow.flow.flow import GraphFlow
-# from lcm.workflows.graphflow.task.task import Task
-# from lcm.workflows.graphflow.task.sync_task import SyncTask
-# from lcm.workflows.graphflow.task.sync_rest_task import SyncRestTask
-# from lcm.workflows.graphflow.task.async_task import AsyncTask
-# from lcm.workflows.graphflow.task.async_rest_task import ASyncRestTask
-# from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask
-# from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask
diff --git a/lcm/workflows/graphflow/tests/__init__.py b/lcm/workflows/graphflow/tests/__init__.py
new file mode 100644 (file)
index 0000000..342c2a8
--- /dev/null
@@ -0,0 +1,13 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lcm/workflows/graphflow/tests/graph_flow_tests.py b/lcm/workflows/graphflow/tests/graph_flow_tests.py
new file mode 100644 (file)
index 0000000..af0aab2
--- /dev/null
@@ -0,0 +1,140 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import mock
+import json
+from lcm.pub.utils import restcall
+from lcm.workflows.graphflow.flow.flow import GraphFlow
+
+
+config = {
+    "CreateSynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateSynVNF"},
+    "CreateAsynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateAsynVNF"},
+    "CreateASynRestVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateASynRestVNF"}
+}
+
+
+class test(object):
+    def execute(self, args):
+        print "test args %s" % args
+
+
+class GraphFlowTest(unittest.TestCase):
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def test_sync_task(self):
+        deploy_graph = {
+            "ran-cu-00": ["ran-du-00"],
+            "ran-du-00": [],
+        }
+        TaskSet = {
+            'ran-cu-00': {
+                "type": "CreateSynVNF",
+                "input": {
+                    "nsInstanceId": 1,
+                    "vnfId": 1
+                },
+                "timeOut": 10
+            },
+            'ran-du-00': {
+                "type": "CreateSynVNF",
+                "input": {
+                    "nsInstanceId": 1,
+                    "vnfId": 1
+                },
+                "timeOut": 10
+            }
+        }
+        gf = GraphFlow(deploy_graph, TaskSet, config)
+        gf.start()
+        gf.join()
+        gf.task_manager.wait_tasks_done(gf.sort_nodes)
+        task_set = gf.task_manager.get_all_task()
+        for task in task_set.itervalues():
+            self.assertEqual(task.FINISHED, task.status)
+
+    def test_async_task(self):
+        deploy_graph = {
+            "ran-cu-01": ["ran-du-01"],
+            "ran-du-01": [],
+        }
+        TaskSet = {
+            'ran-cu-01': {
+                "type": "CreateAsynVNF",
+                "input": {
+                    "nsInstanceId": 1,
+                    "vnfId": 1
+                },
+                "timeOut": 10
+            },
+            'ran-du-01': {
+                "type": "CreateAsynVNF",
+                "input": {
+                    "nsInstanceId": 1,
+                    "vnfId": 1
+                },
+                "timeOut": 10
+            }
+        }
+        gf = GraphFlow(deploy_graph, TaskSet, config)
+        gf.start()
+        gf.join()
+        gf.task_manager.wait_tasks_done(gf.sort_nodes)
+        task_set = gf.task_manager.get_all_task()
+        for task in task_set.itervalues():
+            self.assertEqual(task.FINISHED, task.status)
+
+    @mock.patch.object(restcall, 'call_req')
+    def test_async_rest_task(self, mock_call_req):
+        mock_call_req.return_value = [0, json.JSONEncoder().encode({
+            'jobId': "1",
+            "responseDescriptor": {"progress": 100}
+        }), '200']
+
+        deploy_graph = {
+            "ran-cu-02": ["ran-du-02"],
+            "ran-du-02": [],
+        }
+        TaskSet = {
+            'ran-cu-02': {
+                "type": "CreateASynRestVNF",
+                "input": {
+                    "url": "/test/",
+                    "method": "POST",
+                    "content": {}
+                },
+                "timeOut": 10
+            },
+            'ran-du-02': {
+                "type": "CreateASynRestVNF",
+                "input": {
+                    "url": "/test/",
+                    "method": "POST",
+                    "content": {}
+                },
+                "timeOut": 10
+            }
+        }
+        gf = GraphFlow(deploy_graph, TaskSet, config)
+        gf.start()
+        gf.join()
+        gf.task_manager.wait_tasks_done(gf.sort_nodes)
+        task_set = gf.task_manager.get_all_task()
+        for task in task_set.itervalues():
+            self.assertEqual(task.FINISHED, task.status)
diff --git a/lcm/workflows/graphflow/tests/graph_tests.py b/lcm/workflows/graphflow/tests/graph_tests.py
new file mode 100644 (file)
index 0000000..894c232
--- /dev/null
@@ -0,0 +1,38 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from django.test import TestCase
+from lcm.workflows.graphflow.flow.graph import Graph
+
+logger = logging.getLogger(__name__)
+
+
+class TestToscaparser(TestCase):
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def test_graph(self):
+        data = {
+            "cucp": [],
+            "du": [],
+            "vl_flat_net": ["cucp", "cuup"],
+            "vl_ext_net": ["cucp", "cuup"],
+            "cuup": []
+        }
+        graph = Graph(data)
+        self.assertEqual(['vl_ext_net', 'vl_flat_net'].sort(), graph.get_pre_nodes("cucp").sort())
diff --git a/lcm/workflows/graphflow/tests/task_tests.py b/lcm/workflows/graphflow/tests/task_tests.py
new file mode 100644 (file)
index 0000000..24e0666
--- /dev/null
@@ -0,0 +1,49 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from lcm.workflows.graphflow.task.async_task import AsyncTask
+from lcm.workflows.graphflow.task.sync_task import SyncTask
+from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class CreateSynVNF(SyncTask):
+    def __init__(self, *args):
+        super(CreateSynVNF, self).__init__(*args)
+
+    def run(self):
+        logger.debug("test CreateSynVNF %s" % self.key)
+        return self.FINISHED, {}
+
+
+class CreateAsynVNF(AsyncTask):
+    def __init__(self, *args):
+        super(CreateAsynVNF, self).__init__(*args)
+
+    def run(self):
+        logger.debug("test CreateAsynVNF %s" % self.key)
+        return self.PROCESSING, None
+
+    def get_ext_status(self):
+        return self.FINISHED
+
+
+class CreateASynRestVNF(LcmASyncRestTask):
+
+    def __init__(self, *args):
+        super(CreateASynRestVNF, self).__init__(*args)
+        self.url = "/api/nslcm/v1/vnfs"
+        self.method = self.POST