1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """Transform objects from one form to another"""
19 from functools import partial
22 def extract_components_from_flow(flow):
23 """Given a versionedFlowSnapshot object, extract out the processors
24 and create a list of tuples where each tuple is
25 (component name, component version)"""
26 extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"])
27 return [ extract(p) for p in flow["flowContents"]["processors"] ]
30 def get_component(flow, components, processor_id):
33 return components.get((bundle["artifact"], bundle["version"]), None)
35 cs = [get_component(p) for p in flow["flowContents"]["processors"] \
36 if p["identifier"] == processor_id]
37 return cs[0] if cs else None
40 def make_fbp_from_flow(flow, components: "dict of (name, version) to components"):
41 """Transform a versionedFlowSnapshot object into a runtime API (FBP) request
43 An example of an edge:
50 "port": "DCAE-HELLO-WORLD-PUB-MR"
54 "port": "DCAE-HELLO-WORLD-SUB-MR"
57 "name": "sample_topic_0",
62 "target_graph_id": "string"
65 _get_component = partial(get_component, flow, components)
67 def parse_connection(conn):
68 rels = conn["selectedRelationships"]
70 if conn["source"]["type"] == "PROCESSOR":
71 comp = _get_component(conn["source"]["id"])
74 # REVIEW: Raise error?
78 # publishes:ves_specification:7.30.1:message router:ves-pnfRegistration-secondary
79 rels_pubs = [r for r in rels if "publishes" in r]
82 _, _, _, transport_type, config_key = rels_pubs[0].split(":")
83 src = { "node": comp["id"], "port": config_key }
85 # REVIEW: This should be an error?
86 src = { "node": comp["id"], "port": None }
90 if conn["destination"]["type"] == "PROCESSOR":
91 comp = _get_component(conn["destination"]["id"])
94 # REVIEW: Raise error?
98 # subscribes:predictin:1.0.0:message_router:predict_subscriber
99 rels_subs = [r for r in rels if "subscribes" in r]
102 _, _, _, transport_type, config_key = rels_subs[0].split(":")
103 tgt = { "node": comp["id"], "port": config_key }
105 # REVIEW: This should be an error?
106 tgt = { "node": comp["id"], "port": None }
110 return { "command": "addedge"
116 # TODO: Question these hardcoded attributes
117 , "data_type": "json"
123 def parse_processor(p):
124 c = components[(p["bundle"]["artifact"], p["bundle"]["version"])]
125 return { "command": "addnode"
126 # TODO: spec is required to be a json string but runtime api
127 # changing this soon hopefully
128 , "payload": { "component_spec": json.dumps(c["spec"])
129 , "component_id": c["id"]
134 ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ]
135 cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ]