1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """Transform objects from one form to another"""
19 from functools import partial
22 def extract_components_from_flow(flow):
23 """Given a versionedFlowSnapshot object, extract out the processors
24 and create a list of tuples where each tuple is
25 (component name, component version)"""
26 extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
27 return [extract(p) for p in flow["flowContents"]["processors"]]
30 def get_component(flow, components, processor_id):
33 return components.get((bundle["artifact"], bundle["version"]), None)
35 cs = [get_component(p) for p in flow["flowContents"]["processors"] if p["identifier"] == processor_id]
36 return cs[0] if cs else None
39 def make_fbp_from_flow(flow, components: "dict of (name, version) to components"):
40 """Transform a versionedFlowSnapshot object into a runtime API (FBP) request
42 An example of an edge:
49 "port": "DCAE-HELLO-WORLD-PUB-MR"
53 "port": "DCAE-HELLO-WORLD-SUB-MR"
56 "name": "sample_topic_0",
61 "target_graph_id": "string"
64 _get_component = partial(get_component, flow, components)
66 def parse_connection(conn):
67 rels = conn["selectedRelationships"]
69 if conn["source"]["type"] == "PROCESSOR":
70 comp = _get_component(conn["source"]["id"])
73 # REVIEW: Raise error?
77 # publishes:ves_specification:7.30.1:message router:ves-pnfRegistration-secondary
78 rels_pubs = [r for r in rels if "publishes" in r]
81 _, _, _, transport_type, config_key = rels_pubs[0].split(":")
82 src = {"node": comp["id"], "port": config_key}
84 # REVIEW: This should be an error?
85 src = {"node": comp["id"], "port": None}
89 if conn["destination"]["type"] == "PROCESSOR":
90 comp = _get_component(conn["destination"]["id"])
93 # REVIEW: Raise error?
97 # subscribes:predictin:1.0.0:message_router:predict_subscriber
98 rels_subs = [r for r in rels if "subscribes" in r]
101 _, _, _, transport_type, config_key = rels_subs[0].split(":")
102 tgt = {"node": comp["id"], "port": config_key}
104 # REVIEW: This should be an error?
105 tgt = {"node": comp["id"], "port": None}
110 "command": "addedge",
116 # TODO: Question these hardcoded attributes
124 def parse_processor(p):
125 c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
128 # TODO: spec is required to be a json string but runtime api
129 # changing this soon hopefully
132 "component_spec": json.dumps(c["spec"]),
133 "component_id": c["id"],
139 ps = [parse_processor(p) for p in flow["flowContents"]["processors"]]
140 cs = [parse_connection(c) for c in flow["flowContents"]["connections"]]