X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=mod%2Fdistributorapi%2Fdistributor%2Ftransform.py;h=5ca12777f4b40df43b9783254bcde2faaf08088c;hb=c83d2369eb2f0ac40d5acd2db2d7350fe86101e5;hp=9654249776efbe62962f1b5cec0f587857f5c468;hpb=2b31b78fb0c6621259898c3553187be4ab0acf8a;p=dcaegen2%2Fplatform.git diff --git a/mod/distributorapi/distributor/transform.py b/mod/distributorapi/distributor/transform.py index 9654249..5ca1277 100644 --- a/mod/distributorapi/distributor/transform.py +++ b/mod/distributorapi/distributor/transform.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ def extract_components_from_flow(flow): and create a list of tuples where each tuple is (component name, component version)""" extract = lambda p: (p["bundle"]["artifact"], p["bundle"]["version"]) - return [ extract(p) for p in flow["flowContents"]["processors"] ] + return [extract(p) for p in flow["flowContents"]["processors"]] def get_component(flow, components, processor_id): @@ -32,8 +32,7 @@ def get_component(flow, components, processor_id): bundle = p["bundle"] return components.get((bundle["artifact"], bundle["version"]), None) - cs = [get_component(p) for p in flow["flowContents"]["processors"] \ - if p["identifier"] == processor_id] + cs = [get_component(p) for p in flow["flowContents"]["processors"] if p["identifier"] == processor_id] return cs[0] if cs else None @@ -80,10 +79,10 @@ def make_fbp_from_flow(flow, components: "dict of (name, version) to components" if rels_pubs: _, _, _, transport_type, config_key = rels_pubs[0].split(":") - src = { "node": comp["id"], "port": config_key } + src = {"node": comp["id"], "port": config_key} else: # REVIEW: This should be an error? - src = { "node": comp["id"], "port": None } + src = {"node": comp["id"], "port": None} else: src = {} @@ -100,38 +99,43 @@ def make_fbp_from_flow(flow, components: "dict of (name, version) to components" if rels_subs: _, _, _, transport_type, config_key = rels_subs[0].split(":") - tgt = { "node": comp["id"], "port": config_key } + tgt = {"node": comp["id"], "port": config_key} else: # REVIEW: This should be an error? - tgt = { "node": comp["id"], "port": None } + tgt = {"node": comp["id"], "port": None} else: tgt = {} - return { "command": "addedge" - , "payload": { - "src": src - , "tgt": tgt - , "metadata": { - "name": conn["name"] - # TODO: Question these hardcoded attributes - , "data_type": "json" - , "dmaap_type": "MR" - } - } - } + return { + "command": "addedge", + "payload": { + "src": src, + "tgt": tgt, + "metadata": { + "name": conn["name"] + # TODO: Question these hardcoded attributes + , + "data_type": "json", + "dmaap_type": "MR", + }, + }, + } def parse_processor(p): c = components[(p["bundle"]["artifact"], p["bundle"]["version"])] - return { "command": "addnode" - # TODO: spec is required to be a json string but runtime api - # changing this soon hopefully - , "payload": { "component_spec": json.dumps(c["spec"]) - , "component_id": c["id"] - , "name": c["name"] - , "processor": p } - } - - ps = [ parse_processor(p) for p in flow["flowContents"]["processors"] ] - cs = [ parse_connection(c) for c in flow["flowContents"]["connections"] ] - return ps+cs + return { + "command": "addnode" + # TODO: spec is required to be a json string but runtime api + # changing this soon hopefully + , + "payload": { + "component_spec": json.dumps(c["spec"]), + "component_id": c["id"], + "name": c["name"], + "processor": p, + }, + } + ps = [parse_processor(p) for p in flow["flowContents"]["processors"]] + cs = [parse_connection(c) for c in flow["flowContents"]["connections"]] + return ps + cs