codecoverage improvement
[dcaegen2/platform.git] / mod / distributorapi / distributor / data_access.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """Data layer"""
17
18 from datetime import datetime
19 import uuid
20
21 # TODO: Use real storage
22 _cache = []
23
24
25 def get_distribution_targets():
26     global _cache
27     return _cache
28
29
30 def get_distribution_target(ds_id):
31     global _cache
32     result = [i for i in _cache if i["dt_id"] == ds_id]
33     return result[0] if result else {}
34
35
36 def transform_request(req):
37     """Transform request to object to store
38
39     NOTE: This method is not safe
40     """
41     ts = datetime.utcnow().isoformat()
42     req["created"] = ts
43     req["modified"] = ts
44     req["dt_id"] = str(uuid.uuid4())
45     req["processGroups"] = []
46     return req
47
48
49 def add_distribution_target(dt):
50     global _cache
51     _cache.append(dt)
52     return dt
53
54
55 def merge_request(dt, req):
56     dt["name"] = req["name"]
57     dt["runtimeApiUrl"] = req["runtimeApiUrl"]
58     dt["description"] = req.get("description", None)
59     dt["nextDistributionTargetId"] = req.get("nextDistributionTargetId", None)
60     dt["modified"] = datetime.utcnow().isoformat()
61     return dt
62
63
64 def update_distribution_target(updated_dt):
65     dt_id = updated_dt["dt_id"]
66     global _cache
67     # Did not use list comprehension blah blah because could not do the "return
68     # True" easily
69     for i, dt in enumerate(_cache):
70         if dt["dt_id"] == dt_id:
71             _cache[i] = updated_dt
72             return True
73     return False
74
75
76 def delete_distribution_target(dt_id):
77     global _cache
78     num_prev = len(_cache)
79     _cache = list(filter(lambda e: e["dt_id"] != dt_id, _cache))
80     return len(_cache) < num_prev
81
82
83 def add_process_group(ds_id, process_group):
84     global _cache
85     for dt in _cache:
86         if dt["dt_id"] == ds_id:
87             process_group["processed"] = datetime.utcnow().isoformat()
88             dt["processGroups"].append(process_group)
89             return process_group
90     return None