f275e63ad0e601e4a2cde2c814205a5632d436cd
[dcaegen2/platform.git] / mod / distributorapi / tests / test_transform.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 import os, json
17 from distributor import transform as tr
18
19 TEST_DIR = os.path.dirname(__file__)
20
21 def _load_data(filename):
22     path = os.path.join(TEST_DIR, filename)
23     with open(path) as f:
24         return json.load(f)
25
26 def _setup():
27     flow = _load_data("flow.json")
28     components = _load_data("components.json")
29     components = dict([((c["name"], c["version"]), c) for c in components])
30     return (flow, components)
31
32
33 def test_get_component():
34     flow, components = _setup()
35     c = tr.get_component(flow, components, "a8134467-b4b4-348f-8a1c-8d732fe4fcad")
36     assert "3fadb641-2079-4ca9-bb07-0df5952967fc" == c["id"]
37
38
39 def test_make_fbp_from_flow():
40     flow, components = _setup()
41
42     fbp = tr.make_fbp_from_flow(flow, components)
43     assert len(fbp) == 4
44
45     def check_node(n):
46         n["payload"]["component_id"]
47
48     expected = ["75c9a179-b36b-4985-9445-d44c8768d6eb", "3fadb641-2079-4ca9-bb07-0df5952967fc"]
49     actual = [n["payload"]["component_id"] for n in fbp if n["command"] == "addnode"]
50     assert list(sorted(expected)) == list(sorted(actual))
51
52     # Test processor to processor scenario
53     expected = {'metadata': {'data_type': 'json',
54                            'dmaap_type': 'MR',
55                            'name': 'foo-conn'},
56               'src': {'node': '75c9a179-b36b-4985-9445-d44c8768d6eb',
57                       'port': 'ves-pnfRegistration-secondary'},
58               'tgt': {'node': '3fadb641-2079-4ca9-bb07-0df5952967fc',
59                       'port': 'predict_subscriber'}}
60     actual = [e["payload"] for e in fbp if e["command"] == "addedge"]
61     assert actual[0] == expected or actual[1] == expected
62
63     # Test input port to processor scenario
64     expected = {'metadata': {'data_type': 'json', 'dmaap_type': 'MR',
65         'name': 'ves-data-conn'}, 'src': {},
66         'tgt': {'node': '75c9a179-b36b-4985-9445-d44c8768d6eb',
67             'port': 'ves-notification'}}
68     assert actual[0] == expected or actual[1] == expected