1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
18 from datetime import datetime
21 # TODO: Use real storage
25 def get_distribution_targets():
30 def get_distribution_target(ds_id):
32 result = [i for i in _cache if i["dt_id"] == ds_id]
33 return result[0] if result else {}
36 def transform_request(req):
37 """Transform request to object to store
39 NOTE: This method is not safe
41 ts = datetime.utcnow().isoformat()
44 req["dt_id"] = str(uuid.uuid4())
45 req["processGroups"] = []
49 def add_distribution_target(dt):
55 def merge_request(dt, req):
56 dt["name"] = req["name"]
57 dt["runtimeApiUrl"] = req["runtimeApiUrl"]
58 dt["description"] = req.get("description", None)
59 dt["nextDistributionTargetId"] = req.get("nextDistributionTargetId", None)
60 dt["modified"] = datetime.utcnow().isoformat()
64 def update_distribution_target(updated_dt):
65 dt_id = updated_dt["dt_id"]
67 # Did not use list comprehension blah blah because could not do the "return
69 for i, dt in enumerate(_cache):
70 if dt["dt_id"] == dt_id:
71 _cache[i] = updated_dt
76 def delete_distribution_target(dt_id):
78 num_prev = len(_cache)
79 _cache = list(filter(lambda e: e["dt_id"] != dt_id, _cache))
80 return len(_cache) < num_prev
83 def add_process_group(ds_id, process_group):
86 if dt["dt_id"] == ds_id:
87 process_group["processed"] = datetime.utcnow().isoformat()
88 dt["processGroups"].append(process_group)