Add ability to turn on/off pdp statistics
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpModifyRequestMap.java
index 24443cc..beef475 100644 (file)
@@ -2,7 +2,9 @@
  * ============LICENSE_START=======================================================
  * ONAP PAP
  * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2021 Nordix Foundation.
+ * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.pap.main.comm;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import lombok.Setter;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.pap.concepts.PolicyNotification;
+import org.onap.policy.models.pdp.concepts.Pdp;
+import org.onap.policy.models.pdp.concepts.PdpGroup;
+import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
 import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpSubGroup;
 import org.onap.policy.models.pdp.concepts.PdpUpdate;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
-import org.onap.policy.pap.main.comm.msgdata.StateChangeData;
-import org.onap.policy.pap.main.comm.msgdata.UpdateData;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.models.provider.PolicyModelsProvider;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
+import org.onap.policy.pap.main.comm.msgdata.Request;
+import org.onap.policy.pap.main.comm.msgdata.RequestListener;
+import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
+import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
+import org.onap.policy.pap.main.notification.DeploymentStatus;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
 import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.RequestParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Maps a PDP name to requests that modify PDPs.
  */
 public class PdpModifyRequestMap {
+    private static final Logger logger = LoggerFactory.getLogger(PdpModifyRequestMap.class);
+
+    private static final String UNEXPECTED_BROADCAST = "unexpected broadcast message: ";
 
     /**
-     * Maps a PDP name to its request data. An entry is removed once all of the requests
-     * within the data have been completed.
+     * Maps a PDP name to its outstanding requests.
      */
-    private final Map<String, ModifyReqData> name2data = new HashMap<>();
+    private final Map<String, PdpRequests> pdp2requests = new HashMap<>();
 
     /**
      * PDP modification lock.
@@ -52,7 +82,29 @@ public class PdpModifyRequestMap {
     private final PdpModifyRequestMapParams params;
 
     /**
-     * Constructs the data.
+     * Factory for PAP DAO.
+     */
+    private final PolicyModelsProviderFactoryWrapper daoFactory;
+
+    /**
+     * Used to notify when policy updates completes.
+     */
+    private final PolicyNotifier policyNotifier;
+
+    /**
+     * Used to undeploy policies from the system, when they cannot be deployed to a PDP.
+     *
+     * <p/>
+     * Note: there's a "catch-22" here. The request map needs an undeployer, but the
+     * undeployer needs the request map. Consequently, the request map is created first,
+     * then the undeployer, and finally, this field is set.
+     */
+    @Setter
+    private PolicyUndeployer policyUndeployer;
+
+
+    /**
+     * Constructs the object.
      *
      * @param params configuration parameters
      *
@@ -63,24 +115,31 @@ public class PdpModifyRequestMap {
 
         this.params = params;
         this.modifyLock = params.getModifyLock();
+        this.daoFactory = params.getDaoFactory();
+        this.policyNotifier = params.getPolicyNotifier();
     }
 
     /**
-     * Adds an UPDATE request to the map.
+     * Determines if the map contains any requests.
      *
-     * @param update the UPDATE request or {@code null}
+     * @return {@code true} if the map is empty, {@code false} otherwise
      */
-    public void addRequest(PdpUpdate update) {
-        addRequest(update, null);
+    public boolean isEmpty() {
+        return pdp2requests.isEmpty();
     }
 
     /**
-     * Adds STATE-CHANGE request to the map.
+     * Stops publishing requests to the given PDP.
      *
-     * @param stateChange the STATE-CHANGE request or {@code null}
+     * @param pdpName PDP name
      */
-    public void addRequest(PdpStateChange stateChange) {
-        addRequest(null, stateChange);
+    public void stopPublishing(String pdpName) {
+        synchronized (modifyLock) {
+            PdpRequests requests = pdp2requests.remove(pdpName);
+            if (requests != null) {
+                requests.stopPublishing();
+            }
+        }
     }
 
     /**
@@ -90,288 +149,328 @@ public class PdpModifyRequestMap {
      * @param stateChange the STATE-CHANGE request or {@code null}
      */
     public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
-        if (update == null && stateChange == null) {
-            return;
-        }
+        if (update == null) {
+            addRequest(stateChange);
 
-        synchronized (modifyLock) {
-            String pdpName = getPdpName(update, stateChange);
+        } else if (stateChange == null) {
+            addRequest(update);
 
-            ModifyReqData data = name2data.get(pdpName);
-            if (data != null) {
-                // update the existing request
-                data.add(update);
-                data.add(stateChange);
+        } else if (stateChange.getState() == PdpState.ACTIVE) {
+            // publish update before activating
+            synchronized (modifyLock) {
+                addRequest(update);
+                addRequest(stateChange);
+            }
 
-            } else {
-                data = makeRequestData(update, stateChange);
-                name2data.put(pdpName, data);
-                data.startPublishing();
+        } else {
+            // deactivate before publishing update
+            synchronized (modifyLock) {
+                addRequest(stateChange);
+                addRequest(update);
             }
         }
     }
 
     /**
-     * Gets the PDP name from two requests.
+     * Adds an UPDATE request to the map.
      *
-     * @param update the update request, or {@code null}
-     * @param stateChange the state-change request, or {@code null}
-     * @return the PDP name, or {@code null} if both requests are {@code null}
+     * @param update the UPDATE request or {@code null}
+     * @return the new request (this should only be used by junit tests)
      */
-    private static String getPdpName(PdpUpdate update, PdpStateChange stateChange) {
-        String pdpName;
+    public Request addRequest(PdpUpdate update) {
+        if (update == null) {
+            return null;
+        }
 
-        if (update != null) {
-            if ((pdpName = update.getName()) == null) {
-                throw new IllegalArgumentException("missing name in " + update);
-            }
+        if (isBroadcast(update)) {
+            throw new IllegalArgumentException(UNEXPECTED_BROADCAST + update);
+        }
 
-            if (stateChange != null && !pdpName.equals(stateChange.getName())) {
-                throw new IllegalArgumentException(
-                                "name " + stateChange.getName() + " does not match " + pdpName + " " + stateChange);
-            }
+        // @formatter:off
+        RequestParams reqparams = new RequestParams()
+            .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
+            .setTimers(params.getUpdateTimers())
+            .setModifyLock(params.getModifyLock())
+            .setPdpPublisher(params.getPdpPublisher())
+            .setResponseDispatcher(params.getResponseDispatcher());
+        // @formatter:on
 
-        } else {
-            if ((pdpName = stateChange.getName()) == null) {
-                throw new IllegalArgumentException("missing name in " + stateChange);
-            }
-        }
+        String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
+        var request = new UpdateReq(reqparams, name, update);
 
-        return pdpName;
+        addSingleton(request);
+        return request;
     }
 
     /**
-     * Determines if two requests are the "same", which is does not necessarily mean
-     * "equals".
+     * Adds a STATE-CHANGE request to the map.
      *
-     * @param first first request to check
-     * @param second second request to check
-     * @return {@code true} if the requests are the "same", {@code false} otherwise
+     * @param stateChange the STATE-CHANGE request or {@code null}
+     * @return the new request (this should only be used by junit tests)
      */
-    protected static boolean isSame(PdpUpdate first, PdpUpdate second) {
-        if (first.getPolicies().size() != second.getPolicies().size()) {
-            return false;
+    public Request addRequest(PdpStateChange stateChange) {
+        if (stateChange == null) {
+            return null;
         }
 
-        if (!first.getPdpGroup().equals(second.getPdpGroup())) {
-            return false;
+        if (isBroadcast(stateChange)) {
+            throw new IllegalArgumentException(UNEXPECTED_BROADCAST + stateChange);
         }
 
-        if (!first.getPdpSubgroup().equals(second.getPdpSubgroup())) {
-            return false;
-        }
+        // @formatter:off
+        RequestParams reqparams = new RequestParams()
+            .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
+            .setTimers(params.getStateChangeTimers())
+            .setModifyLock(params.getModifyLock())
+            .setPdpPublisher(params.getPdpPublisher())
+            .setResponseDispatcher(params.getResponseDispatcher());
+        // @formatter:on
 
-        // see if the other has any policies that this does not have
-        ArrayList<ToscaPolicy> lst = new ArrayList<>(second.getPolicies());
-        lst.removeAll(first.getPolicies());
+        String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
+        var request = new StateChangeReq(reqparams, name, stateChange);
 
-        return lst.isEmpty();
+        addSingleton(request);
+        return request;
     }
 
     /**
-     * Determines if two requests are the "same", which is does not necessarily mean
-     * "equals".
+     * Determines if a message is a broadcast message.
      *
-     * @param first first request to check
-     * @param second second request to check
-     * @return {@code true} if this update subsumes the other, {@code false} otherwise
+     * @param message the message to examine
+     * @return {@code true} if the message is a broadcast message, {@code false} if
+     *         destined for a single PDP
      */
-    protected static boolean isSame(PdpStateChange first, PdpStateChange second) {
-        return (first.getState() == second.getState());
+    private boolean isBroadcast(PdpMessage message) {
+        return (message.getName() == null);
     }
 
     /**
-     * Request data, which contains an UPDATE or a STATE-CHANGE request, or both. The
-     * UPDATE is always published before the STATE-CHANGE. In addition, both requests may
-     * be changed at any point, possibly triggering a restart of the publishing.
+     * Configures and adds a request to the map.
+     *
+     * @param request the request to be added
      */
-    public class ModifyReqData extends RequestData {
+    private void addSingleton(Request request) {
 
-        /**
-         * The UPDATE message to be published, or {@code null}.
-         */
-        private PdpUpdate update;
+        synchronized (modifyLock) {
+            PdpRequests requests = pdp2requests.computeIfAbsent(request.getMessage().getName(), this::makePdpRequests);
 
-        /**
-         * The STATE-CHANGE message to be published, or {@code null}.
-         */
-        private PdpStateChange stateChange;
+            request.setListener(new SingletonListener(requests, request));
+            requests.addSingleton(request);
+        }
+    }
 
+    /**
+     * Removes expired PDPs from all active groups.
+     */
+    public void removeExpiredPdps() {
 
-        /**
-         * Constructs the object.
-         *
-         * @param newUpdate the UPDATE message to be sent, or {@code null}
-         * @param newState the STATE-CHANGE message to be sent, or {@code null}
-         */
-        public ModifyReqData(PdpUpdate newUpdate, PdpStateChange newState) {
-            super(params);
+        synchronized (modifyLock) {
+            logger.info("check for PDP records older than {}ms", params.getMaxPdpAgeMs());
 
-            if (newUpdate != null) {
-                this.stateChange = newState;
-                setName(newUpdate.getName());
-                update = newUpdate;
-                configure(new ModUpdateData(newUpdate));
+            try (PolicyModelsProvider dao = daoFactory.create()) {
 
-            } else {
-                this.update = null;
-                setName(newState.getName());
-                stateChange = newState;
-                configure(new ModStateChangeData(newState));
-            }
-        }
+                PdpGroupFilter filter = PdpGroupFilter.builder().groupState(PdpState.ACTIVE).build();
+                List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
+                List<PdpGroup> updates = new ArrayList<>(1);
 
-        /**
-         * Determines if this request is still in the map.
-         */
-        @Override
-        protected boolean isActive() {
-            return (name2data.get(getName()) == this);
-        }
+                var status = new DeploymentStatus(dao);
 
-        /**
-         * Removes this request from the map.
-         */
-        @Override
-        protected void allCompleted() {
-            name2data.remove(getName(), this);
-        }
+                Instant minAge = Instant.now().minusMillis(params.getMaxPdpAgeMs());
 
-        /**
-         * Adds an UPDATE to the request data, replacing any existing UPDATE, if
-         * appropriate. If the UPDATE is replaced, then publishing is restarted.
-         *
-         * @param newRequest the new UPDATE request
-         */
-        private void add(PdpUpdate newRequest) {
-            if (newRequest == null) {
-                return;
-            }
-
-            synchronized (modifyLock) {
-                if (update != null && isSame(update, newRequest)) {
-                    // already have this update - discard it
-                    return;
+                for (PdpGroup group : groups) {
+                    Set<String> pdps = removeFromGroup(minAge, group);
+                    if (!pdps.isEmpty()) {
+                        updates.add(group);
+                        status.loadByGroup(group.getName());
+                        pdps.forEach(status::deleteDeployment);
+                    }
                 }
 
-                // must restart from scratch
-                stopPublishing();
+                if (!updates.isEmpty()) {
+                    dao.updatePdpGroups(updates);
 
-                update = newRequest;
-                configure(new ModUpdateData(newRequest));
+                    var notification = new PolicyNotification();
+                    status.flush(notification);
 
-                startPublishing();
+                    policyNotifier.publish(notification);
+                }
+
+            } catch (PfModelException e) {
+                logger.warn("failed to remove expired PDPs", e);
             }
         }
+    }
 
-        /**
-         * Adds a STATE-CHANGE to the request data, replacing any existing UPDATE, if
-         * appropriate. If the STATE-CHANGE is replaced, and we're currently publishing
-         * the STATE-CHANGE, then publishing is restarted.
-         *
-         * @param newRequest the new STATE-CHANGE request
-         */
-        private void add(PdpStateChange newRequest) {
-            if (newRequest == null) {
-                return;
-            }
+    /**
+     * Removes expired PDPs from a group.
+     *
+     * @param minAge minimum age for active PDPs
+     * @param group group from which expired PDPs should be removed
+     * @return the expired PDPs
+     */
+    private Set<String> removeFromGroup(Instant minAge, PdpGroup group) {
+        Set<String> pdps = new HashSet<>();
+        for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
+            removeFromSubgroup(minAge, group, subgrp, pdps);
+        }
 
-            synchronized (modifyLock) {
-                if (stateChange != null && isSame(stateChange, newRequest)) {
-                    // already have this update - discard it
-                    return;
-                }
+        return pdps;
+    }
 
-                if (getWrapper() instanceof StateChangeData) {
-                    // we were publishing STATE-CHANGE, thus must restart it
-                    stopPublishing();
+    /**
+     * Removes expired PDPs from a subgroup.
+     *
+     * @param minAge minimum age for active PDPs
+     * @param group group from which to attempt to remove the PDP
+     * @param subgrp subgroup from which to attempt to remove the PDP
+     * @param pdps where to place the expired PDPs
+     */
+    private void removeFromSubgroup(Instant minAge, PdpGroup group, PdpSubGroup subgrp, Set<String> pdps) {
 
-                    stateChange = newRequest;
-                    configure(new ModStateChangeData(newRequest));
+        Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
 
-                    startPublishing();
+        while (iter.hasNext()) {
+            Pdp instance = iter.next();
 
-                } else {
-                    // haven't started publishing STATE-CHANGE yet, just replace it
-                    stateChange = newRequest;
-                }
+            if (instance.getLastUpdate().isBefore(minAge)) {
+                String pdpName = instance.getInstanceId();
+                logger.info("removed {} from group={} subgroup={}", pdpName, group.getName(), subgrp.getPdpType());
+                iter.remove();
+                subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
+                pdps.add(pdpName);
             }
         }
+    }
 
-        /**
-         * Indicates that the retry count was exhausted.
-         */
-        protected void retryCountExhausted() {
-            // remove this request data from the PDP request map
-            allCompleted();
-
-            // TODO what to do?
-        }
+    /**
+     * Creates a new set of requests for a PDP. May be overridden by junit tests.
+     *
+     * @param pdpName PDP name
+     * @return a new set of requests
+     */
+    protected PdpRequests makePdpRequests(String pdpName) {
+        return new PdpRequests(pdpName, policyNotifier);
+    }
 
-        /**
-         * Indicates that a response did not match the data.
-         *
-         * @param reason the reason for the mismatch
-         */
-        protected void mismatch(String reason) {
-            // remove this request data from the PDP request map
-            allCompleted();
+    /**
+     * Makes a handler for PDP responses.
+     * @return a response handler
+     */
+    protected PdpStatusMessageHandler makePdpResponseHandler() {
+        return new PdpStatusMessageHandler(params.getParams(), params.isSavePdpStatistics());
+    }
 
-            // TODO what to do?
+    /**
+     * Listener for singleton request events.
+     */
+    private class SingletonListener implements RequestListener {
+        private final PdpRequests requests;
+        private final Request request;
+        private final String pdpName;
+
+        public SingletonListener(PdpRequests requests, Request request) {
+            this.requests = requests;
+            this.request = request;
+            this.pdpName = requests.getPdpName();
         }
 
-        /**
-         * Wraps an UPDATE.
-         */
-        private class ModUpdateData extends UpdateData {
+        @Override
+        public void failure(String responsePdpName, String reason) {
+            Collection<ToscaConceptIdentifier> undeployPolicies = requestCompleted(responsePdpName);
+            if (undeployPolicies.isEmpty()) {
+                // nothing to undeploy
+                return;
+            }
 
-            public ModUpdateData(PdpUpdate message) {
-                super(message, params);
+            /*
+             * Undeploy the extra policies. Note: this will likely cause a new message to
+             * be assigned to the request, thus we must re-start it after making the
+             * change.
+             */
+            PdpMessage oldmsg = request.getMessage();
+
+            try {
+                logger.warn("undeploy policies from {}:{} that failed to deploy: {}", oldmsg.getPdpGroup(),
+                                oldmsg.getPdpSubgroup(), undeployPolicies);
+                policyUndeployer.undeploy(oldmsg.getPdpGroup(), oldmsg.getPdpSubgroup(), undeployPolicies);
+            } catch (PfModelException | RuntimeException e) {
+                logger.error("cannot undeploy policies {}", undeployPolicies, e);
             }
 
-            @Override
-            public void mismatch(String reason) {
-                ModifyReqData.this.mismatch(reason);
+            if (request.getMessage() == oldmsg) {
+                // message is unchanged - start the next request
+                startNextRequest(request);
+            } else {
+                // message changed - restart the request
+                request.startPublishing();
             }
+        }
 
-            @Override
-            public void completed() {
-                if (stateChange == null) {
-                    // no STATE-CHANGE request - we're done
-                    allCompleted();
+        @Override
+        public void success(String responsePdpName, PdpStatus response) {
+            requestCompleted(responsePdpName);
 
-                } else {
-                    // now process the STATE-CHANGE request
-                    configure(new ModStateChangeData(stateChange));
-                    startPublishing();
-                }
+            if (!(request instanceof UpdateReq)) {
+                // other response types may not include the list of policies
+                return;
             }
+
+            /*
+             * Update PDP time stamps. Also send pdp-update and pdp-state-change, as
+             * necessary, if the response does not reflect what's in the DB.
+             */
+            var handler = makePdpResponseHandler();
+            handler.handlePdpStatus(response);
         }
 
         /**
-         * Wraps a STATE-CHANGE.
+         * Handles a request completion, starting the next request, if there is one.
+         *
+         * @param responsePdpName name of the PDP provided in the response
+         * @return a list of policies to be undeployed
          */
-        private class ModStateChangeData extends StateChangeData {
+        private Collection<ToscaConceptIdentifier> requestCompleted(String responsePdpName) {
+            if (!pdpName.equals(responsePdpName)) {
+                return Collections.emptyList();
+            }
 
-            public ModStateChangeData(PdpStateChange message) {
-                super(message, params);
+            if (pdp2requests.get(pdpName) != requests) {
+                logger.info("discard old requests for {}", responsePdpName);
+                requests.stopPublishing();
+                return Collections.emptyList();
             }
 
-            @Override
-            public void mismatch(String reason) {
-                ModifyReqData.this.mismatch(reason);
+            if (!requests.isFirstInQueue(request)) {
+                logger.error("request is not first in the queue {}", request.getMessage());
+                return Collections.emptyList();
             }
 
-            @Override
-            public void completed() {
-                allCompleted();
+            Collection<ToscaConceptIdentifier> undeployPolicies = request.getUndeployPolicies();
+            if (undeployPolicies.isEmpty()) {
+                // nothing to undeploy - just start the next request
+                startNextRequest(request);
             }
+
+            return undeployPolicies;
         }
-    }
 
-    // these may be overridden by junit tests
+        @Override
+        public void retryCountExhausted(Request request) {
+            if (pdp2requests.get(pdpName) == requests) {
+                requests.stopPublishing();
+                startNextRequest(request);
+            }
+        }
 
-    protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) {
-        return new ModifyReqData(update, stateChange);
+        /**
+         * Starts the next request associated with a PDP.
+         *
+         * @param request the request that just completed
+         */
+        private void startNextRequest(Request request) {
+            if (!requests.startNextRequest(request)) {
+                pdp2requests.remove(pdpName, requests);
+            }
+        }
     }
 }