Fix incorrect deployments counter on parallel execution
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / notification / DeploymentStatus.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2022 Bell Canada. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.pap.main.notification;
23
24 import io.micrometer.core.instrument.Counter;
25 import io.micrometer.core.instrument.MeterRegistry;
26 import java.util.ArrayList;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Set;
34 import java.util.function.BiPredicate;
35 import java.util.stream.Collectors;
36 import lombok.AccessLevel;
37 import lombok.Getter;
38 import org.onap.policy.common.utils.resources.PrometheusUtils;
39 import org.onap.policy.common.utils.services.Registry;
40 import org.onap.policy.models.pap.concepts.PolicyNotification;
41 import org.onap.policy.models.pdp.concepts.PdpPolicyStatus;
42 import org.onap.policy.models.pdp.concepts.PdpPolicyStatus.State;
43 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
44 import org.onap.policy.pap.main.PapConstants;
45 import org.onap.policy.pap.main.notification.StatusAction.Action;
46 import org.onap.policy.pap.main.service.PolicyStatusService;
47
48 /**
49  * Collection of Policy Deployment Status records. The sequence of method invocations
50  * should be as follows:
51  * <ol>
52  * <li>{@link #loadByGroup(String)}</li>
53  * <li>various other methods</li>
54  * <li>repeat the previous steps as appropriate</li>
55  * <li>{@link #flush(PolicyNotification)}</li>
56  * </ol>
57  */
58 public class DeploymentStatus {
59
60     /**
61      * Tracks the groups that have been loaded.
62      */
63     private final Set<String> pdpGroupLoaded = new HashSet<>();
64
65     /**
66      * Records, mapped by PDP/Policy pair.
67      */
68     @Getter(AccessLevel.PROTECTED)
69     private final Map<StatusKey, StatusAction> recordMap = new HashMap<>();
70
71     /**
72      * Records the policy status so that notifications can be generated. When
73      * {@link #loadByGroup(String)} is invoked, records are added to this. Other than
74      * that, this is not updated until {@link #addNotifications(PolicyNotification)} is
75      * invoked.
76      */
77     private DeploymentTracker tracker = new DeploymentTracker();
78
79     private PolicyStatusService policyStatusService;
80
81     private Counter deploymentSuccessCounter;
82     private Counter unDeploymentSuccessCounter;
83     private Counter deploymentFailureCounter;
84     private Counter unDeploymentFailureCounter;
85
86     /**
87      * Constructs the object.
88      *
89      * @param policyStatusService the policyStatusService
90      */
91     public DeploymentStatus(PolicyStatusService policyStatusService) {
92         this.policyStatusService = policyStatusService;
93         initializeMetrics();
94     }
95
96     private void initializeMetrics() {
97         String counterName = "pap_" + PrometheusUtils.POLICY_DEPLOYMENTS_METRIC;
98         MeterRegistry meterRegistry = Registry.get(PapConstants.REG_METER_REGISTRY, MeterRegistry.class);
99         deploymentSuccessCounter =
100             Counter.builder(counterName).tags(PrometheusUtils.OPERATION_METRIC_LABEL, PrometheusUtils.DEPLOY_OPERATION,
101                 PrometheusUtils.STATUS_METRIC_LABEL, State.SUCCESS.name()).register(meterRegistry);
102
103         unDeploymentSuccessCounter = Counter.builder(counterName).tags(PrometheusUtils.OPERATION_METRIC_LABEL,
104             PrometheusUtils.UNDEPLOY_OPERATION, PrometheusUtils.STATUS_METRIC_LABEL, State.SUCCESS.name())
105             .register(meterRegistry);
106
107         deploymentFailureCounter =
108             Counter.builder(counterName).tags(PrometheusUtils.OPERATION_METRIC_LABEL, PrometheusUtils.DEPLOY_OPERATION,
109                 PrometheusUtils.STATUS_METRIC_LABEL, State.FAILURE.name()).register(meterRegistry);
110
111         unDeploymentFailureCounter = Counter.builder(counterName).tags(PrometheusUtils.OPERATION_METRIC_LABEL,
112             PrometheusUtils.UNDEPLOY_OPERATION, PrometheusUtils.STATUS_METRIC_LABEL, State.FAILURE.name())
113             .register(meterRegistry);
114     }
115
116     /**
117      * Adds new policy status to a notification.
118      *
119      * @param notif notification to which to add policy status
120      */
121     protected void addNotifications(PolicyNotification notif) {
122         var newTracker = new DeploymentTracker();
123         recordMap.values().forEach(newTracker::add);
124
125         tracker.addNotifications(notif, newTracker);
126
127         tracker = newTracker;
128     }
129
130     /**
131      * Loads policy deployment status associated with the given PDP group.
132      *
133      * @param pdpGroup group whose records are to be loaded
134      */
135     public void loadByGroup(String pdpGroup) {
136         if (pdpGroupLoaded.contains(pdpGroup)) {
137             return;
138         }
139
140         pdpGroupLoaded.add(pdpGroup);
141
142         for (PdpPolicyStatus status : policyStatusService.getGroupPolicyStatus(pdpGroup)) {
143             var status2 = new StatusAction(Action.UNCHANGED, status);
144             recordMap.put(new StatusKey(status), status2);
145             tracker.add(status2);
146         }
147     }
148
149     /**
150      * Flushes changes to the DB, adding policy status to the notification.
151      *
152      * @param notif notification to which to add policy status
153      */
154     public void flush(PolicyNotification notif) {
155         // must add notifications BEFORE deleting undeployments
156         addNotifications(notif);
157         updateMetrics();
158         deleteUndeployments();
159         flush();
160     }
161
162     /**
163      * Flushes changes to the DB.
164      */
165     protected void flush() {
166         // categorize the records
167         List<PdpPolicyStatus> created = new ArrayList<>();
168         List<PdpPolicyStatus> updated = new ArrayList<>();
169         List<PdpPolicyStatus> deleted = new ArrayList<>();
170
171         for (StatusAction status : recordMap.values()) {
172             switch (status.getAction()) {
173                 case CREATED:
174                     created.add(status.getStatus());
175                     break;
176                 case UPDATED:
177                     updated.add(status.getStatus());
178                     break;
179                 case DELETED:
180                     deleted.add(status.getStatus());
181                     break;
182                 default:
183                     break;
184             }
185         }
186
187         policyStatusService.cudPolicyStatus(created, updated, deleted);
188
189         /*
190          * update the records to indicate everything is now unchanged (i.e., matches what
191          * is in the DB)
192          */
193
194         Iterator<StatusAction> iter = recordMap.values().iterator();
195         while (iter.hasNext()) {
196             StatusAction status = iter.next();
197
198             if (status.getAction() == Action.DELETED) {
199                 iter.remove();
200             } else {
201                 status.setAction(Action.UNCHANGED);
202             }
203         }
204     }
205
206     private void updateMetrics() {
207         recordMap.forEach((key, value) -> {
208             if (value.getAction().equals(StatusAction.Action.UPDATED)) {
209                 if (value.getStatus().getState().equals(State.SUCCESS)) {
210                     (value.getStatus().isDeploy() ? deploymentSuccessCounter : unDeploymentSuccessCounter).increment();
211                 } else if (value.getStatus().getState().equals(State.FAILURE)) {
212                     (value.getStatus().isDeploy() ? deploymentFailureCounter : unDeploymentFailureCounter).increment();
213                 }
214             }
215         });
216     }
217
218     /**
219      * Deletes records for any policies that have been completely undeployed.
220      */
221     protected void deleteUndeployments() {
222         // identify the incomplete policies
223
224         // @formatter:off
225         Set<ToscaConceptIdentifier> incomplete = recordMap.values().stream()
226             .filter(status -> status.getAction() != Action.DELETED)
227             .map(StatusAction::getStatus)
228             .filter(status -> status.getState() == State.WAITING)
229             .map(PdpPolicyStatus::getPolicy)
230             .collect(Collectors.toSet());
231         // @formatter:on
232
233         // delete if UNDEPLOYED and not incomplete
234         deleteDeployment((key, status) -> !status.getStatus().isDeploy() && !incomplete.contains(key.getPolicy()));
235     }
236
237     /**
238      * Delete deployment records for a PDP.
239      *
240      * @param pdpId PDP whose records are to be deleted
241      */
242     public void deleteDeployment(String pdpId) {
243         deleteDeployment((key, status) -> key.getPdpId().equals(pdpId));
244     }
245
246     /**
247      * Delete deployment records for a policy.
248      *
249      * @param policy policy whose records are to be deleted
250      * @param deploy {@code true} to delete deployment records, {@code false} to delete
251      *        undeployment records
252      */
253     public void deleteDeployment(ToscaConceptIdentifier policy, boolean deploy) {
254         deleteDeployment((key, status) -> status.getStatus().isDeploy() == deploy && key.getPolicy().equals(policy));
255     }
256
257     /**
258      * Delete deployment records for a policy.
259      *
260      * @param filter filter to identify records to be deleted
261      */
262     private void deleteDeployment(BiPredicate<StatusKey, StatusAction> filter) {
263         Iterator<Entry<StatusKey, StatusAction>> iter = recordMap.entrySet().iterator();
264         while (iter.hasNext()) {
265             Entry<StatusKey, StatusAction> entry = iter.next();
266             StatusKey key = entry.getKey();
267             StatusAction value = entry.getValue();
268
269             if (filter.test(key, value)) {
270                 if (value.getAction() == Action.CREATED) {
271                     // it's a new record - just remove it
272                     iter.remove();
273                 } else {
274                     // it's an existing record - mark it for deletion
275                     value.setAction(Action.DELETED);
276                 }
277             }
278         }
279     }
280
281     /**
282      * Deploys/undeploys a policy to a PDP. Assumes that
283      * {@link #deleteDeployment(ToscaConceptIdentifier, boolean)} has already been invoked
284      * to delete any records having the wrong "deploy" value.
285      *
286      * @param pdpId PDP to which the policy is to be deployed
287      * @param policy policy to be deployed
288      * @param policyType policy's type
289      * @param pdpGroup PdpGroup containing the PDP of interest
290      * @param pdpType PDP type (i.e., PdpSubGroup) containing the PDP of interest
291      * @param deploy {@code true} if the policy is being deployed, {@code false} if
292      *        undeployed
293      */
294     public void deploy(String pdpId, ToscaConceptIdentifier policy, ToscaConceptIdentifier policyType, String pdpGroup,
295                     String pdpType, boolean deploy) {
296
297         recordMap.compute(new StatusKey(pdpId, policy), (key, status) -> {
298
299             if (status == null) {
300                 // no record yet - create one
301
302                 // @formatter:off
303                 return new StatusAction(Action.CREATED, PdpPolicyStatus.builder()
304                                     .pdpGroup(pdpGroup)
305                                     .pdpId(pdpId)
306                                     .pdpType(pdpType)
307                                     .policy(policy)
308                                     .policyType(policyType)
309                                     .deploy(deploy)
310                                     .state(State.WAITING)
311                                     .build());
312                 // @formatter:on
313             }
314
315             PdpPolicyStatus status2 = status.getStatus();
316
317             // record already exists - see if the deployment flag should be changed
318
319             if (status2.isDeploy() != deploy) {
320                 // deployment flag has changed
321                 status.setChanged();
322                 status2.setDeploy(deploy);
323                 status2.setState(State.WAITING);
324
325
326             } else if (status.getAction() == Action.DELETED) {
327                 // deployment flag is unchanged
328                 status.setAction(Action.UPDATED);
329             }
330
331             return status;
332         });
333     }
334
335     /**
336      * Indicates the deployment/undeployment of a set of policies to a PDP has completed.
337      *
338      * @param pdpId PDP of interest
339      * @param expectedPolicies policies that we expected to be deployed to the PDP
340      * @param actualPolicies policies that were actually deployed to the PDP
341      */
342     public void completeDeploy(String pdpId, Set<ToscaConceptIdentifier> expectedPolicies,
343                     Set<ToscaConceptIdentifier> actualPolicies) {
344
345         for (StatusAction status : recordMap.values()) {
346             PdpPolicyStatus status2 = status.getStatus();
347
348             if (!status.getStatus().getPdpId().equals(pdpId)
349                             || expectedPolicies.contains(status2.getPolicy()) != status2.isDeploy()) {
350                 /*
351                  * The policy is "expected" to be deployed, but the record is not marked
352                  * for deployment (or vice versa), which means the expected policy is out
353                  * of date with the DB, thus we'll ignore this policy for now.
354                  */
355                 continue;
356             }
357
358             State state;
359             if (actualPolicies.contains(status2.getPolicy())) {
360                 state = (status.getStatus().isDeploy() ? State.SUCCESS : State.FAILURE);
361             } else {
362                 state = (status.getStatus().isDeploy() ? State.FAILURE : State.SUCCESS);
363             }
364
365             if (status2.getState() != state) {
366                 status.setChanged();
367                 status2.setState(state);
368             }
369         }
370     }
371 }