1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 from distributor import transform as tr
19 TEST_DIR = os.path.dirname(__file__)
21 def _load_data(filename):
22 path = os.path.join(TEST_DIR, filename)
27 flow = _load_data("flow.json")
28 components = _load_data("components.json")
29 components = dict([((c["name"], c["version"]), c) for c in components])
30 return (flow, components)
33 def test_get_component():
34 flow, components = _setup()
35 c = tr.get_component(flow, components, "a8134467-b4b4-348f-8a1c-8d732fe4fcad")
36 assert "3fadb641-2079-4ca9-bb07-0df5952967fc" == c["id"]
39 def test_make_fbp_from_flow():
40 flow, components = _setup()
42 fbp = tr.make_fbp_from_flow(flow, components)
46 n["payload"]["component_id"]
48 expected = ["75c9a179-b36b-4985-9445-d44c8768d6eb", "3fadb641-2079-4ca9-bb07-0df5952967fc"]
49 actual = [n["payload"]["component_id"] for n in fbp if n["command"] == "addnode"]
50 assert list(sorted(expected)) == list(sorted(actual))
52 # Test processor to processor scenario
53 expected = {'metadata': {'data_type': 'json',
56 'src': {'node': '75c9a179-b36b-4985-9445-d44c8768d6eb',
57 'port': 'ves-pnfRegistration-secondary'},
58 'tgt': {'node': '3fadb641-2079-4ca9-bb07-0df5952967fc',
59 'port': 'predict_subscriber'}}
60 actual = [e["payload"] for e in fbp if e["command"] == "addedge"]
61 assert actual[0] == expected or actual[1] == expected
63 # Test input port to processor scenario
64 expected = {'metadata': {'data_type': 'json', 'dmaap_type': 'MR',
65 'name': 'ves-data-conn'}, 'src': {},
66 'tgt': {'node': '75c9a179-b36b-4985-9445-d44c8768d6eb',
67 'port': 'ves-notification'}}
68 assert actual[0] == expected or actual[1] == expected