9654249776efbe62962f1b5cec0f587857f5c468
[dcaegen2/platform.git] / mod / distributorapi / distributor / transform.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """Transform objects from one form to another"""
17
18 import json
19 from functools import partial
20
21
22 def extract_components_from_flow(flow):
23     """Given a versionedFlowSnapshot object, extract out the processors
24     and create a list of tuples where each tuple is
25     (component name, component version)"""
26     extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
27     return [ extract(p) for p in flow["flowContents"]["processors"] ]
28
29
30 def get_component(flow, components, processor_id):
31     def get_component(p):
32         bundle = p["bundle"]
33         return components.get((bundle["artifact"], bundle["version"]), None)
34
35     cs = [get_component(p) for p in flow["flowContents"]["processors"] \
36             if p["identifier"] == processor_id]
37     return cs[0] if cs else None
38
39
40 def make_fbp_from_flow(flow, components: "dict of (name, version) to components"):
41     """Transform a versionedFlowSnapshot object into a runtime API (FBP) request
42
43     An example of an edge:
44
45     {
46         "command": "addedge",
47         "payload": {
48             "src" : {
49                 "node": "comp1234",
50                 "port": "DCAE-HELLO-WORLD-PUB-MR"
51             },
52             "tgt" : {
53                 "node": "comp5678",
54                 "port": "DCAE-HELLO-WORLD-SUB-MR"
55             },
56         "metadata":{
57             "name": "sample_topic_0",
58             "data_type": "json",
59             "dmaap_type": "MR"
60             }
61         },
62         "target_graph_id": "string"
63     }
64     """
65     _get_component = partial(get_component, flow, components)
66
67     def parse_connection(conn):
68         rels = conn["selectedRelationships"]
69
70         if conn["source"]["type"] == "PROCESSOR":
71             comp = _get_component(conn["source"]["id"])
72
73             if not comp:
74                 # REVIEW: Raise error?
75                 return None
76
77             # Example:
78             # publishes:ves_specification:7.30.1:message router:ves-pnfRegistration-secondary
79             rels_pubs = [r for r in rels if "publishes" in r]
80
81             if rels_pubs:
82                 _, _, _, transport_type, config_key = rels_pubs[0].split(":")
83                 src = { "node": comp["id"], "port": config_key }
84             else:
85                 # REVIEW: This should be an error?
86                 src = { "node": comp["id"], "port": None }
87         else:
88             src = {}
89
90         if conn["destination"]["type"] == "PROCESSOR":
91             comp = _get_component(conn["destination"]["id"])
92
93             if not comp:
94                 # REVIEW: Raise error?
95                 return None
96
97             # Example:
98             # subscribes:predictin:1.0.0:message_router:predict_subscriber
99             rels_subs = [r for r in rels if "subscribes" in r]
100
101             if rels_subs:
102                 _, _, _, transport_type, config_key = rels_subs[0].split(":")
103                 tgt = { "node": comp["id"], "port": config_key }
104             else:
105                 # REVIEW: This should be an error?
106                 tgt = { "node": comp["id"], "port": None }
107         else:
108             tgt = {}
109
110         return { "command": "addedge"
111                 , "payload": {
112                     "src": src
113                     , "tgt": tgt
114                     , "metadata": {
115                         "name": conn["name"]
116                         # TODO: Question these hardcoded attributes
117                         , "data_type": "json"
118                         , "dmaap_type": "MR"
119                         }
120                     }
121                 }
122
123     def parse_processor(p):
124         c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
125         return { "command": "addnode"
126                 # TODO: spec is required to be a json string but runtime api
127                 # changing this soon hopefully
128                 , "payload": { "component_spec": json.dumps(c["spec"])
129                     , "component_id": c["id"]
130                     , "name": c["name"]
131                     , "processor": p }
132                 }
133
134     ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ]
135     cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ]
136     return ps+cs
137