Add ability to turn on/off pdp statistics
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpModifyRequestMap.java
index 6a743a3..beef475 100644 (file)
@@ -2,7 +2,9 @@
  * ============LICENSE_START=======================================================
  * ONAP PAP
  * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2021 Nordix Foundation.
+ * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.pap.main.comm;
 
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import lombok.Setter;
 import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.pap.concepts.PolicyNotification;
 import org.onap.policy.models.pdp.concepts.Pdp;
 import org.onap.policy.models.pdp.concepts.PdpGroup;
 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
 import org.onap.policy.models.pdp.concepts.PdpMessage;
 import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
 import org.onap.policy.models.pdp.concepts.PdpUpdate;
 import org.onap.policy.models.pdp.enums.PdpState;
 import org.onap.policy.models.provider.PolicyModelsProvider;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
 import org.onap.policy.pap.main.comm.msgdata.Request;
 import org.onap.policy.pap.main.comm.msgdata.RequestListener;
 import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
 import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
+import org.onap.policy.pap.main.notification.DeploymentStatus;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
 import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
 import org.onap.policy.pap.main.parameters.RequestParams;
 import org.slf4j.Logger;
@@ -73,6 +86,22 @@ public class PdpModifyRequestMap {
      */
     private final PolicyModelsProviderFactoryWrapper daoFactory;
 
+    /**
+     * Used to notify when policy updates completes.
+     */
+    private final PolicyNotifier policyNotifier;
+
+    /**
+     * Used to undeploy policies from the system, when they cannot be deployed to a PDP.
+     *
+     * <p/>
+     * Note: there's a "catch-22" here. The request map needs an undeployer, but the
+     * undeployer needs the request map. Consequently, the request map is created first,
+     * then the undeployer, and finally, this field is set.
+     */
+    @Setter
+    private PolicyUndeployer policyUndeployer;
+
 
     /**
      * Constructs the object.
@@ -87,6 +116,16 @@ public class PdpModifyRequestMap {
         this.params = params;
         this.modifyLock = params.getModifyLock();
         this.daoFactory = params.getDaoFactory();
+        this.policyNotifier = params.getPolicyNotifier();
+    }
+
+    /**
+     * Determines if the map contains any requests.
+     *
+     * @return {@code true} if the map is empty, {@code false} otherwise
+     */
+    public boolean isEmpty() {
+        return pdp2requests.isEmpty();
     }
 
     /**
@@ -113,11 +152,22 @@ public class PdpModifyRequestMap {
         if (update == null) {
             addRequest(stateChange);
 
-        } else {
+        } else if (stateChange == null) {
+            addRequest(update);
+
+        } else if (stateChange.getState() == PdpState.ACTIVE) {
+            // publish update before activating
             synchronized (modifyLock) {
                 addRequest(update);
                 addRequest(stateChange);
             }
+
+        } else {
+            // deactivate before publishing update
+            synchronized (modifyLock) {
+                addRequest(stateChange);
+                addRequest(update);
+            }
         }
     }
 
@@ -125,10 +175,11 @@ public class PdpModifyRequestMap {
      * Adds an UPDATE request to the map.
      *
      * @param update the UPDATE request or {@code null}
+     * @return the new request (this should only be used by junit tests)
      */
-    public void addRequest(PdpUpdate update) {
+    public Request addRequest(PdpUpdate update) {
         if (update == null) {
-            return;
+            return null;
         }
 
         if (isBroadcast(update)) {
@@ -140,24 +191,26 @@ public class PdpModifyRequestMap {
             .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
             .setTimers(params.getUpdateTimers())
             .setModifyLock(params.getModifyLock())
-            .setPublisher(params.getPublisher())
+            .setPdpPublisher(params.getPdpPublisher())
             .setResponseDispatcher(params.getResponseDispatcher());
         // @formatter:on
 
         String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
-        UpdateReq request = new UpdateReq(reqparams, name, update);
+        var request = new UpdateReq(reqparams, name, update);
 
         addSingleton(request);
+        return request;
     }
 
     /**
      * Adds a STATE-CHANGE request to the map.
      *
      * @param stateChange the STATE-CHANGE request or {@code null}
+     * @return the new request (this should only be used by junit tests)
      */
-    public void addRequest(PdpStateChange stateChange) {
+    public Request addRequest(PdpStateChange stateChange) {
         if (stateChange == null) {
-            return;
+            return null;
         }
 
         if (isBroadcast(stateChange)) {
@@ -169,14 +222,15 @@ public class PdpModifyRequestMap {
             .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
             .setTimers(params.getStateChangeTimers())
             .setModifyLock(params.getModifyLock())
-            .setPublisher(params.getPublisher())
+            .setPdpPublisher(params.getPdpPublisher())
             .setResponseDispatcher(params.getResponseDispatcher());
         // @formatter:on
 
         String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
-        StateChangeReq request = new StateChangeReq(reqparams, name, stateChange);
+        var request = new StateChangeReq(reqparams, name, stateChange);
 
         addSingleton(request);
+        return request;
     }
 
     /**
@@ -206,112 +260,86 @@ public class PdpModifyRequestMap {
     }
 
     /**
-     * Starts the next request associated with a PDP.
-     *
-     * @param requests current set of requests
-     * @param request the request that just completed
-     */
-    private void startNextRequest(PdpRequests requests, Request request) {
-        if (!requests.startNextRequest(request)) {
-            pdp2requests.remove(requests.getPdpName(), requests);
-        }
-    }
-
-    /**
-     * Disables a PDP by removing it from its subgroup and then sending it a PASSIVE
-     * request.
-     *
-     * @param requests the requests associated with the PDP to be disabled
+     * Removes expired PDPs from all active groups.
      */
-    private void disablePdp(PdpRequests requests) {
-
-        // remove the requests from the map
-        if (!pdp2requests.remove(requests.getPdpName(), requests)) {
-            // don't have the info we need to disable it
-            logger.warn("no requests with which to disable {}", requests.getPdpName());
-            return;
-        }
-
-        logger.warn("disabling {}", requests.getPdpName());
+    public void removeExpiredPdps() {
 
-        requests.stopPublishing();
+        synchronized (modifyLock) {
+            logger.info("check for PDP records older than {}ms", params.getMaxPdpAgeMs());
 
-        // don't do anything if we don't have a group
-        String name = requests.getLastGroupName();
-        if (name == null) {
-            logger.warn("no group with which to disable {}", requests.getPdpName());
-            return;
-        }
+            try (PolicyModelsProvider dao = daoFactory.create()) {
 
-        // remove the PDP from the group
-        removeFromGroup(requests.getPdpName(), name);
+                PdpGroupFilter filter = PdpGroupFilter.builder().groupState(PdpState.ACTIVE).build();
+                List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
+                List<PdpGroup> updates = new ArrayList<>(1);
 
-        // send the state change
-        PdpStateChange change = new PdpStateChange();
-        change.setName(requests.getPdpName());
-        change.setState(PdpState.PASSIVE);
-        addRequest(change);
-    }
+                var status = new DeploymentStatus(dao);
 
-    /**
-     * Removes a PDP from its group.
-     *
-     * @param pdpName name of the PDP to be removed
-     * @param groupName name of the group from which it should be removed
-     */
-    private void removeFromGroup(String pdpName, String groupName) {
+                Instant minAge = Instant.now().minusMillis(params.getMaxPdpAgeMs());
 
-        try (PolicyModelsProvider dao = daoFactory.create()) {
+                for (PdpGroup group : groups) {
+                    Set<String> pdps = removeFromGroup(minAge, group);
+                    if (!pdps.isEmpty()) {
+                        updates.add(group);
+                        status.loadByGroup(group.getName());
+                        pdps.forEach(status::deleteDeployment);
+                    }
+                }
 
-            PdpGroupFilter filter = PdpGroupFilter.builder().name(groupName).groupState(PdpState.ACTIVE)
-                            .version(PdpGroupFilter.LATEST_VERSION).build();
+                if (!updates.isEmpty()) {
+                    dao.updatePdpGroups(updates);
 
-            List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
-            if (groups.isEmpty()) {
-                return;
-            }
+                    var notification = new PolicyNotification();
+                    status.flush(notification);
 
-            PdpGroup group = groups.get(0);
-
-            for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
-                if (removeFromSubgroup(pdpName, group, subgrp)) {
-                    dao.updatePdpGroups(Collections.singletonList(group));
-                    return;
+                    policyNotifier.publish(notification);
                 }
+
+            } catch (PfModelException e) {
+                logger.warn("failed to remove expired PDPs", e);
             }
+        }
+    }
 
-        } catch (PfModelException e) {
-            logger.info("unable to remove PDP {} from subgroup", pdpName, e);
+    /**
+     * Removes expired PDPs from a group.
+     *
+     * @param minAge minimum age for active PDPs
+     * @param group group from which expired PDPs should be removed
+     * @return the expired PDPs
+     */
+    private Set<String> removeFromGroup(Instant minAge, PdpGroup group) {
+        Set<String> pdps = new HashSet<>();
+        for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
+            removeFromSubgroup(minAge, group, subgrp, pdps);
         }
+
+        return pdps;
     }
 
     /**
-     * Removes a PDP from a subgroup.
+     * Removes expired PDPs from a subgroup.
      *
-     * @param pdpName name of the PDP to be removed
+     * @param minAge minimum age for active PDPs
      * @param group group from which to attempt to remove the PDP
      * @param subgrp subgroup from which to attempt to remove the PDP
-     * @return {@code true} if the PDP was removed, {@code false} if the PDP was not in
-     *         the group
-     * @throws PfModelException if a DB error occurs
+     * @param pdps where to place the expired PDPs
      */
-    private boolean removeFromSubgroup(String pdpName, PdpGroup group, PdpSubGroup subgrp) throws PfModelException {
+    private void removeFromSubgroup(Instant minAge, PdpGroup group, PdpSubGroup subgrp, Set<String> pdps) {
 
         Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
 
         while (iter.hasNext()) {
             Pdp instance = iter.next();
 
-            if (pdpName.equals(instance.getInstanceId())) {
-                logger.info("removed {} from group={} version={} subgroup={}", pdpName, group.getName(),
-                                group.getVersion(), subgrp.getPdpType());
+            if (instance.getLastUpdate().isBefore(minAge)) {
+                String pdpName = instance.getInstanceId();
+                logger.info("removed {} from group={} subgroup={}", pdpName, group.getName(), subgrp.getPdpType());
                 iter.remove();
                 subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
-                return true;
+                pdps.add(pdpName);
             }
         }
-
-        return false;
     }
 
     /**
@@ -321,7 +349,15 @@ public class PdpModifyRequestMap {
      * @return a new set of requests
      */
     protected PdpRequests makePdpRequests(String pdpName) {
-        return new PdpRequests(pdpName);
+        return new PdpRequests(pdpName, policyNotifier);
+    }
+
+    /**
+     * Makes a handler for PDP responses.
+     * @return a response handler
+     */
+    protected PdpStatusMessageHandler makePdpResponseHandler() {
+        return new PdpStatusMessageHandler(params.getParams(), params.isSavePdpStatistics());
     }
 
     /**
@@ -330,35 +366,111 @@ public class PdpModifyRequestMap {
     private class SingletonListener implements RequestListener {
         private final PdpRequests requests;
         private final Request request;
+        private final String pdpName;
 
         public SingletonListener(PdpRequests requests, Request request) {
             this.requests = requests;
             this.request = request;
+            this.pdpName = requests.getPdpName();
         }
 
         @Override
-        public void failure(String pdpName, String reason) {
-            if (requests.getPdpName().equals(pdpName)) {
-                disablePdp(requests);
+        public void failure(String responsePdpName, String reason) {
+            Collection<ToscaConceptIdentifier> undeployPolicies = requestCompleted(responsePdpName);
+            if (undeployPolicies.isEmpty()) {
+                // nothing to undeploy
+                return;
+            }
+
+            /*
+             * Undeploy the extra policies. Note: this will likely cause a new message to
+             * be assigned to the request, thus we must re-start it after making the
+             * change.
+             */
+            PdpMessage oldmsg = request.getMessage();
+
+            try {
+                logger.warn("undeploy policies from {}:{} that failed to deploy: {}", oldmsg.getPdpGroup(),
+                                oldmsg.getPdpSubgroup(), undeployPolicies);
+                policyUndeployer.undeploy(oldmsg.getPdpGroup(), oldmsg.getPdpSubgroup(), undeployPolicies);
+            } catch (PfModelException | RuntimeException e) {
+                logger.error("cannot undeploy policies {}", undeployPolicies, e);
+            }
+
+            if (request.getMessage() == oldmsg) {
+                // message is unchanged - start the next request
+                startNextRequest(request);
+            } else {
+                // message changed - restart the request
+                request.startPublishing();
             }
         }
 
         @Override
-        public void success(String pdpName) {
-            if (requests.getPdpName().equals(pdpName)) {
-                if (pdp2requests.get(requests.getPdpName()) == requests) {
-                    startNextRequest(requests, request);
-
-                } else {
-                    logger.info("discard old requests for {}", pdpName);
-                    requests.stopPublishing();
-                }
+        public void success(String responsePdpName, PdpStatus response) {
+            requestCompleted(responsePdpName);
+
+            if (!(request instanceof UpdateReq)) {
+                // other response types may not include the list of policies
+                return;
             }
+
+            /*
+             * Update PDP time stamps. Also send pdp-update and pdp-state-change, as
+             * necessary, if the response does not reflect what's in the DB.
+             */
+            var handler = makePdpResponseHandler();
+            handler.handlePdpStatus(response);
+        }
+
+        /**
+         * Handles a request completion, starting the next request, if there is one.
+         *
+         * @param responsePdpName name of the PDP provided in the response
+         * @return a list of policies to be undeployed
+         */
+        private Collection<ToscaConceptIdentifier> requestCompleted(String responsePdpName) {
+            if (!pdpName.equals(responsePdpName)) {
+                return Collections.emptyList();
+            }
+
+            if (pdp2requests.get(pdpName) != requests) {
+                logger.info("discard old requests for {}", responsePdpName);
+                requests.stopPublishing();
+                return Collections.emptyList();
+            }
+
+            if (!requests.isFirstInQueue(request)) {
+                logger.error("request is not first in the queue {}", request.getMessage());
+                return Collections.emptyList();
+            }
+
+            Collection<ToscaConceptIdentifier> undeployPolicies = request.getUndeployPolicies();
+            if (undeployPolicies.isEmpty()) {
+                // nothing to undeploy - just start the next request
+                startNextRequest(request);
+            }
+
+            return undeployPolicies;
         }
 
         @Override
-        public void retryCountExhausted() {
-            disablePdp(requests);
+        public void retryCountExhausted(Request request) {
+            if (pdp2requests.get(pdpName) == requests) {
+                requests.stopPublishing();
+                startNextRequest(request);
+            }
+        }
+
+        /**
+         * Starts the next request associated with a PDP.
+         *
+         * @param request the request that just completed
+         */
+        private void startNextRequest(Request request) {
+            if (!requests.startNextRequest(request)) {
+                pdp2requests.remove(pdpName, requests);
+            }
         }
     }
 }