1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 from distributor import transform as tr
19 TEST_DIR = os.path.dirname(__file__)
22 def _load_data(filename):
23 path = os.path.join(TEST_DIR, filename)
29 flow = _load_data("flow.json")
30 components = _load_data("components.json")
31 components = dict([((c["name"], c["version"]), c) for c in components])
32 return (flow, components)
35 def test_get_component():
36 flow, components = _setup()
37 c = tr.get_component(flow, components, "a8134467-b4b4-348f-8a1c-8d732fe4fcad")
38 assert "3fadb641-2079-4ca9-bb07-0df5952967fc" == c["id"]
41 def test_make_fbp_from_flow():
42 flow, components = _setup()
44 fbp = tr.make_fbp_from_flow(flow, components)
48 n["payload"]["component_id"]
50 expected = ["75c9a179-b36b-4985-9445-d44c8768d6eb", "3fadb641-2079-4ca9-bb07-0df5952967fc"]
51 actual = [n["payload"]["component_id"] for n in fbp if n["command"] == "addnode"]
52 assert list(sorted(expected)) == list(sorted(actual))
54 # Test processor to processor scenario
56 "metadata": {"data_type": "json", "dmaap_type": "MR", "name": "foo-conn"},
57 "src": {"node": "75c9a179-b36b-4985-9445-d44c8768d6eb", "port": "ves-pnfRegistration-secondary"},
58 "tgt": {"node": "3fadb641-2079-4ca9-bb07-0df5952967fc", "port": "predict_subscriber"},
60 actual = [e["payload"] for e in fbp if e["command"] == "addedge"]
61 assert actual[0] == expected or actual[1] == expected
63 # Test input port to processor scenario
65 "metadata": {"data_type": "json", "dmaap_type": "MR", "name": "ves-data-conn"},
67 "tgt": {"node": "75c9a179-b36b-4985-9445-d44c8768d6eb", "port": "ves-notification"},
69 assert actual[0] == expected or actual[1] == expected