codecoverage improvement
[dcaegen2/platform.git] / mod / distributorapi / tests / test_transform.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 import os, json
17 from distributor import transform as tr
18
19 TEST_DIR = os.path.dirname(__file__)
20
21
22 def _load_data(filename):
23     path = os.path.join(TEST_DIR, filename)
24     with open(path) as f:
25         return json.load(f)
26
27
28 def _setup():
29     flow = _load_data("flow.json")
30     components = _load_data("components.json")
31     components = dict([((c["name"], c["version"]), c) for c in components])
32     return (flow, components)
33
34
35 def test_get_component():
36     flow, components = _setup()
37     c = tr.get_component(flow, components, "a8134467-b4b4-348f-8a1c-8d732fe4fcad")
38     assert "3fadb641-2079-4ca9-bb07-0df5952967fc" == c["id"]
39
40
41 def test_make_fbp_from_flow():
42     flow, components = _setup()
43
44     fbp = tr.make_fbp_from_flow(flow, components)
45     assert len(fbp) == 4
46
47     def check_node(n):
48         n["payload"]["component_id"]
49
50     expected = ["75c9a179-b36b-4985-9445-d44c8768d6eb", "3fadb641-2079-4ca9-bb07-0df5952967fc"]
51     actual = [n["payload"]["component_id"] for n in fbp if n["command"] == "addnode"]
52     assert list(sorted(expected)) == list(sorted(actual))
53
54     # Test processor to processor scenario
55     expected = {
56         "metadata": {"data_type": "json", "dmaap_type": "MR", "name": "foo-conn"},
57         "src": {"node": "75c9a179-b36b-4985-9445-d44c8768d6eb", "port": "ves-pnfRegistration-secondary"},
58         "tgt": {"node": "3fadb641-2079-4ca9-bb07-0df5952967fc", "port": "predict_subscriber"},
59     }
60     actual = [e["payload"] for e in fbp if e["command"] == "addedge"]
61     assert actual[0] == expected or actual[1] == expected
62
63     # Test input port to processor scenario
64     expected = {
65         "metadata": {"data_type": "json", "dmaap_type": "MR", "name": "ves-data-conn"},
66         "src": {},
67         "tgt": {"node": "75c9a179-b36b-4985-9445-d44c8768d6eb", "port": "ves-notification"},
68     }
69     assert actual[0] == expected or actual[1] == expected