codecoverage improvement
[dcaegen2/platform.git] / mod / distributorapi / distributor / transform.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """Transform objects from one form to another"""
17
18 import json
19 from functools import partial
20
21
22 def extract_components_from_flow(flow):
23     """Given a versionedFlowSnapshot object, extract out the processors
24     and create a list of tuples where each tuple is
25     (component name, component version)"""
26     extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
27     return [extract(p) for p in flow["flowContents"]["processors"]]
28
29
30 def get_component(flow, components, processor_id):
31     def get_component(p):
32         bundle = p["bundle"]
33         return components.get((bundle["artifact"], bundle["version"]), None)
34
35     cs = [get_component(p) for p in flow["flowContents"]["processors"] if p["identifier"] == processor_id]
36     return cs[0] if cs else None
37
38
39 def make_fbp_from_flow(flow, components: "dict of (name, version) to components"):
40     """Transform a versionedFlowSnapshot object into a runtime API (FBP) request
41
42     An example of an edge:
43
44     {
45         "command": "addedge",
46         "payload": {
47             "src" : {
48                 "node": "comp1234",
49                 "port": "DCAE-HELLO-WORLD-PUB-MR"
50             },
51             "tgt" : {
52                 "node": "comp5678",
53                 "port": "DCAE-HELLO-WORLD-SUB-MR"
54             },
55         "metadata":{
56             "name": "sample_topic_0",
57             "data_type": "json",
58             "dmaap_type": "MR"
59             }
60         },
61         "target_graph_id": "string"
62     }
63     """
64     _get_component = partial(get_component, flow, components)
65
66     def parse_connection(conn):
67         rels = conn["selectedRelationships"]
68
69         if conn["source"]["type"] == "PROCESSOR":
70             comp = _get_component(conn["source"]["id"])
71
72             if not comp:
73                 # REVIEW: Raise error?
74                 return None
75
76             # Example:
77             # publishes:ves_specification:7.30.1:message router:ves-pnfRegistration-secondary
78             rels_pubs = [r for r in rels if "publishes" in r]
79
80             if rels_pubs:
81                 _, _, _, transport_type, config_key = rels_pubs[0].split(":")
82                 src = {"node": comp["id"], "port": config_key}
83             else:
84                 # REVIEW: This should be an error?
85                 src = {"node": comp["id"], "port": None}
86         else:
87             src = {}
88
89         if conn["destination"]["type"] == "PROCESSOR":
90             comp = _get_component(conn["destination"]["id"])
91
92             if not comp:
93                 # REVIEW: Raise error?
94                 return None
95
96             # Example:
97             # subscribes:predictin:1.0.0:message_router:predict_subscriber
98             rels_subs = [r for r in rels if "subscribes" in r]
99
100             if rels_subs:
101                 _, _, _, transport_type, config_key = rels_subs[0].split(":")
102                 tgt = {"node": comp["id"], "port": config_key}
103             else:
104                 # REVIEW: This should be an error?
105                 tgt = {"node": comp["id"], "port": None}
106         else:
107             tgt = {}
108
109         return {
110             "command": "addedge",
111             "payload": {
112                 "src": src,
113                 "tgt": tgt,
114                 "metadata": {
115                     "name": conn["name"]
116                     # TODO: Question these hardcoded attributes
117                     ,
118                     "data_type": "json",
119                     "dmaap_type": "MR",
120                 },
121             },
122         }
123
124     def parse_processor(p):
125         c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
126         return {
127             "command": "addnode"
128             # TODO: spec is required to be a json string but runtime api
129             # changing this soon hopefully
130             ,
131             "payload": {
132                 "component_spec": json.dumps(c["spec"]),
133                 "component_id": c["id"],
134                 "name": c["name"],
135                 "processor": p,
136             },
137         }
138
139     ps = [parse_processor(p) for p in flow["flowContents"]["processors"]]
140     cs = [parse_connection(c) for c in flow["flowContents"]["connections"]]
141     return ps + cs