7eddeae110c4db7a9229dd3aee97cdbb67bdc0a4
[ccsdk/oran.git] /
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.repository;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25
26 import java.lang.invoke.MethodHandles;
27 import java.time.Instant;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Set;
35 import java.util.Vector;
36
37 import lombok.Builder;
38 import lombok.Getter;
39
40 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
41 import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore;
42 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.springframework.beans.factory.annotation.Autowired;
46
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
49 import reactor.util.annotation.Nullable;
50
51 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
52 public class Policies {
53
54     @Getter
55     @Builder
56     private static class PersistentPolicyInfo {
57         private String id;
58         private String json;
59         private String ownerServiceId;
60         private String ricId;
61         private String typeId;
62         private String statusNotificationUri;
63         private boolean isTransient;
64         private String lastModified;
65     }
66
67     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
68     private Map<String, Policy> policiesId = new HashMap<>();
69     private MultiMap<Policy> policiesRic = new MultiMap<>();
70     private MultiMap<Policy> policiesService = new MultiMap<>();
71     private MultiMap<Policy> policiesType = new MultiMap<>();
72     private final DataStore dataStore;
73
74     private static Gson gson = new GsonBuilder().create();
75
76     public Policies(@Autowired ApplicationConfig appConfig) {
77         this.dataStore = DataStore.create(appConfig, "policies");
78     }
79
80     public Flux<Policy> restoreFromDatabase(Ric ric, PolicyTypes types) {
81         return dataStore.createDataStore() //
82                 .flatMapMany(x -> dataStore.listObjects(getPath(ric))) //
83                 .flatMap(dataStore::readObject) //
84                 .map(String::new) //
85                 .map(json -> gson.fromJson(json, PersistentPolicyInfo.class)) //
86                 .map(policyInfo -> toPolicy(policyInfo, ric, types)) //
87                 .doOnNext(this::put) //
88                 .filter(Objects::nonNull) //
89                 .doOnError(t -> logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(),
90                         t.getMessage())) //
91                 .doFinally(sig -> logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(),
92                         this.policiesRic.get(ric.id()).size())) //
93                 .onErrorResume(t -> Flux.empty()) //
94         ;
95     }
96
97     public synchronized void put(Policy policy) {
98         Policy previousDef = this.get(policy.getId());
99         if (previousDef != null) {
100             removeFromMaps(previousDef);
101         }
102
103         policiesId.put(policy.getId(), policy);
104         policiesRic.put(policy.getRic().id(), policy.getId(), policy);
105         policiesService.put(policy.getOwnerServiceId(), policy.getId(), policy);
106         policiesType.put(policy.getType().getId(), policy.getId(), policy);
107         if (!policy.isTransient()) {
108             store(policy);
109         }
110     }
111
112     public synchronized boolean containsPolicy(String id) {
113         return policiesId.containsKey(id);
114     }
115
116     public synchronized Policy get(String id) {
117         return policiesId.get(id);
118     }
119
120     public synchronized Policy getPolicy(String id) throws EntityNotFoundException {
121         Policy p = policiesId.get(id);
122         if (p == null) {
123             throw new EntityNotFoundException("Could not find policy: " + id);
124         }
125         return p;
126     }
127
128     public synchronized Collection<Policy> getAll() {
129         return new Vector<>(policiesId.values());
130     }
131
132     public synchronized Collection<Policy> getForService(String service) {
133         return policiesService.get(service);
134     }
135
136     public synchronized Collection<Policy> getForRic(String ric) {
137         return policiesRic.get(ric);
138     }
139
140     public synchronized Set<String> getPolicyIdsForRic(String ricId) {
141         return policiesRic.keySet(ricId);
142     }
143
144     public synchronized Collection<Policy> getForType(String type) {
145         return policiesType.get(type);
146     }
147
148     public synchronized Policy removeId(String id) {
149         Policy p = policiesId.get(id);
150         if (p != null) {
151             remove(p);
152         }
153         return p;
154     }
155
156     public synchronized void remove(Policy policy) {
157         if (!policy.isTransient()) {
158             dataStore.deleteObject(getPath(policy)).subscribe();
159         }
160         removeFromMaps(policy);
161     }
162
163     public synchronized void removePoliciesForRic(String ricId) {
164         Collection<Policy> policiesForRic = getForRic(ricId);
165         for (Policy policy : policiesForRic) {
166             remove(policy);
167         }
168     }
169
170     public Collection<Policy> filterPolicies(@Nullable String typeId, @Nullable String ricId,
171             @Nullable String serviceId, @Nullable String typeName) {
172
173         if (typeId != null) {
174             return filter(this.getForType(typeId), null, ricId, serviceId, typeName);
175         } else if (serviceId != null) {
176             return filter(this.getForService(serviceId), typeId, ricId, null, typeName);
177         } else if (ricId != null) {
178             return filter(this.getForRic(ricId), typeId, null, serviceId, typeName);
179         } else {
180             return filter(this.getAll(), typeId, ricId, serviceId, typeName);
181         }
182     }
183
184     public synchronized int size() {
185         return policiesId.size();
186     }
187
188     public synchronized void clear() {
189         while (policiesId.size() > 0) {
190             Set<String> keys = policiesId.keySet();
191             removeId(keys.iterator().next());
192         }
193         dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe();
194     }
195
196     private void store(Policy policy) {
197
198         byte[] bytes = gson.toJson(toStorageObject(policy)).getBytes();
199         this.dataStore.writeObject(this.getPath(policy), bytes) //
200                 .doOnError(t -> logger.error("Could not store policy in S3, reason: {}", t.getMessage())) //
201                 .subscribe();
202     }
203
204     private void removeFromMaps(Policy policy) {
205         policiesId.remove(policy.getId());
206         policiesRic.remove(policy.getRic().id(), policy.getId());
207         policiesService.remove(policy.getOwnerServiceId(), policy.getId());
208         policiesType.remove(policy.getType().getId(), policy.getId());
209     }
210
211     private boolean isMatch(String filterValue, String actualValue) {
212         return filterValue == null || actualValue.equals(filterValue);
213     }
214
215     private boolean isTypeMatch(Policy policy, @Nullable String typeName) {
216         return (typeName == null) || policy.getType().getTypeId().getName().equals(typeName);
217     }
218
219     private Collection<Policy> filter(Collection<Policy> collection, String typeId, String ricId, String serviceId,
220             String typeName) {
221         if (typeId == null && ricId == null && serviceId == null && typeName == null) {
222             return collection;
223         }
224         List<Policy> filtered = new ArrayList<>(collection.size());
225         for (Policy p : collection) {
226             if (isMatch(typeId, p.getType().getId()) && isMatch(ricId, p.getRic().id())
227                     && isMatch(serviceId, p.getOwnerServiceId()) && isTypeMatch(p, typeName)) {
228                 filtered.add(p);
229             }
230         }
231         return filtered;
232     }
233
234     private PersistentPolicyInfo toStorageObject(Policy p) {
235         return PersistentPolicyInfo.builder() //
236                 .id(p.getId()) //
237                 .json(p.getJson()) //
238                 .ownerServiceId(p.getOwnerServiceId()) //
239                 .ricId(p.getRic().id()) //
240                 .statusNotificationUri(p.getStatusNotificationUri()) //
241                 .typeId(p.getType().getId()) //
242                 .isTransient(p.isTransient()) //
243                 .lastModified(p.getLastModified().toString()) //
244                 .build();
245     }
246
247     private Policy toPolicy(PersistentPolicyInfo p, Ric ric, PolicyTypes types) {
248         try {
249             return Policy.builder() //
250                     .id(p.getId()) //
251                     .isTransient(p.isTransient()) //
252                     .json(p.getJson()) //
253                     .lastModified(Instant.parse(p.lastModified)) //
254                     .ownerServiceId(p.getOwnerServiceId()) //
255                     .ric(ric) //
256                     .statusNotificationUri(p.getStatusNotificationUri()) //
257                     .type(types.getType(p.getTypeId())) //
258                     .build();
259         } catch (EntityNotFoundException e) {
260             logger.warn("Not found: {}", e.getMessage());
261             return null;
262         }
263     }
264
265     private String getPath(Policy policy) {
266         return getPath(policy.getRic()) + "/" + policy.getId() + ".json";
267     }
268
269     private String getPath(Ric ric) {
270         return ric.id();
271     }
272
273 }