codecoverage improvement
[dcaegen2/platform.git] / mod / distributorapi / distributor / registry_client.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """Sophisticated Nifi registry client"""
17
18 from distributor.utils import urljoin as _urljoin
19 from distributor.utils import get_json as _get_json
20
21
22 def _add_url_from_link(registry_url, obj):
23     result = {}
24
25     for k, v in obj.items():
26         if k == "link":
27             result["selfUrl"] = _urljoin(registry_url, v["href"])
28             result[k] = v
29         elif type(v) == dict:
30             result[k] = _add_url_from_link(registry_url, v)
31         else:
32             result[k] = v
33
34     return result
35
36
37 def get_buckets(registry_url):
38     buckets = _get_json(_urljoin(registry_url, "buckets"))
39     return [_add_url_from_link(registry_url, b) for b in buckets]
40
41
42 def get_flows(registry_url, bucket_url):
43     flows = _get_json(_urljoin(bucket_url, "flows"))
44     return [_add_url_from_link(registry_url, f) for f in flows]
45
46
47 def find_flow(registry_url, flow_id):
48     buckets = get_buckets(registry_url)
49
50     def is_match(flow):
51         return flow["identifier"] == flow_id
52
53     for bucket in buckets:
54         result = [f for f in get_flows(registry_url, bucket["selfUrl"]) if is_match(f)]
55
56         if result:
57             return result.pop()
58
59     return None
60
61
62 def get_flow_versions(flow_url):
63     """Returns list of versions from greatest to least for a given flow"""
64     versions_url = _urljoin(flow_url, "versions")
65     # List of versions will be greatest to least
66     return list(reversed(sorted([v["version"] for v in _get_json(versions_url)])))
67
68
69 def get_flow_diff(registry_url, flow_url, version_one, version_two):
70     diff_url = _urljoin(flow_url, "diff", str(version_one), str(version_two))
71     return _get_json(diff_url)
72
73
74 def get_flow_diff_latest(registry_url, flow_url):
75     versions = get_flow_versions(flow_url)
76
77     if len(versions) == 0:
78         # Should not happen, should this be an error?
79         return None
80     elif len(versions) == 1:
81         return None
82     else:
83         # Example in gitlab wiki shows that lower version is first
84         return _add_url_from_link(registry_url, get_flow_diff(registry_url, flow_url, versions[1], versions[0]))
85
86
87 def get_flow_version(registry_url, flow_url, version):
88     version_url = _urljoin(flow_url, "versions", str(version))
89     return _add_url_from_link(registry_url, _get_json(version_url))
90
91
92 def get_flow_version_latest(registry_url, flow_url):
93     return get_flow_version(registry_url, flow_url, "latest")