Add components for PDP communication 78/83278/5
authorJim Hahn <jrh3@att.com>
Sun, 24 Mar 2019 12:13:58 +0000 (08:13 -0400)
committerJim Hahn <jrh3@att.com>
Tue, 26 Mar 2019 14:27:50 +0000 (10:27 -0400)
Added PAP DAO interfaces.
Added Publisher.
Added TimerManager.
Added RequestDataParams.
Added PdpModifyRequestMapParams.
Added RequestData.
Added PdpModifyRequestMapTest.
Updated timer test.
Extracted nested MessageData classes into their own files.
Addressed merge conflict.
Removed unneeded methods from PapActivator.

Fixed mismatchint action name.

Change-Id: I3aebef68a62b48d9154dd7a4c4ff366f9914717c
Issue-ID: POLICY-1542
Signed-off-by: Jim Hahn <jrh3@att.com>
23 files changed:
main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java

diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
new file mode 100644 (file)
index 0000000..f2ebca5
--- /dev/null
@@ -0,0 +1,377 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy;
+import org.onap.policy.pap.main.comm.msgdata.StateChangeData;
+import org.onap.policy.pap.main.comm.msgdata.UpdateData;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+
+/**
+ * Maps a PDP name to requests that modify PDPs.
+ */
+public class PdpModifyRequestMap {
+
+    /**
+     * Maps a PDP name to its request data. An entry is removed once all of the requests
+     * within the data have been completed.
+     */
+    private final Map<String, ModifyReqData> name2data = new HashMap<>();
+
+    /**
+     * PDP modification lock.
+     */
+    private final Object modifyLock;
+
+    /**
+     * The configuration parameters.
+     */
+    private final PdpModifyRequestMapParams params;
+
+    /**
+     * Constructs the data.
+     *
+     * @param params configuration parameters
+     *
+     * @throws IllegalArgumentException if a required parameter is not set
+     */
+    public PdpModifyRequestMap(PdpModifyRequestMapParams params) {
+        params.validate();
+
+        this.params = params;
+        this.modifyLock = params.getModifyLock();
+    }
+
+    /**
+     * Adds an UPDATE request to the map.
+     *
+     * @param update the UPDATE request or {@code null}
+     */
+    public void addRequest(PdpUpdate update) {
+        addRequest(update, null);
+    }
+
+    /**
+     * Adds STATE-CHANGE request to the map.
+     *
+     * @param stateChange the STATE-CHANGE request or {@code null}
+     */
+    public void addRequest(PdpStateChange stateChange) {
+        addRequest(null, stateChange);
+    }
+
+    /**
+     * Adds a pair of requests to the map.
+     *
+     * @param update the UPDATE request or {@code null}
+     * @param stateChange the STATE-CHANGE request or {@code null}
+     */
+    public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
+        if (update == null && stateChange == null) {
+            return;
+        }
+
+        synchronized (modifyLock) {
+            String pdpName = getPdpName(update, stateChange);
+
+            ModifyReqData data = name2data.get(pdpName);
+            if (data != null) {
+                // update the existing request
+                data.add(update);
+                data.add(stateChange);
+
+            } else {
+                data = makeRequestData(update, stateChange);
+                name2data.put(pdpName, data);
+                data.startPublishing();
+            }
+        }
+    }
+
+    /**
+     * Gets the PDP name from two requests.
+     *
+     * @param update the update request, or {@code null}
+     * @param stateChange the state-change request, or {@code null}
+     * @return the PDP name, or {@code null} if both requests are {@code null}
+     */
+    private static String getPdpName(PdpUpdate update, PdpStateChange stateChange) {
+        String pdpName;
+
+        if (update != null) {
+            if ((pdpName = update.getName()) == null) {
+                throw new IllegalArgumentException("missing name in " + update);
+            }
+
+            if (stateChange != null && !pdpName.equals(stateChange.getName())) {
+                throw new IllegalArgumentException(
+                                "name " + stateChange.getName() + " does not match " + pdpName + " " + stateChange);
+            }
+
+        } else {
+            if ((pdpName = stateChange.getName()) == null) {
+                throw new IllegalArgumentException("missing name in " + stateChange);
+            }
+        }
+
+        return pdpName;
+    }
+
+    /**
+     * Determines if two requests are the "same", which is does not necessarily mean
+     * "equals".
+     *
+     * @param first first request to check
+     * @param second second request to check
+     * @return {@code true} if the requests are the "same", {@code false} otherwise
+     */
+    protected static boolean isSame(PdpUpdate first, PdpUpdate second) {
+        if (first.getPolicies().size() != second.getPolicies().size()) {
+            return false;
+        }
+
+        if (!first.getPdpGroup().equals(second.getPdpGroup())) {
+            return false;
+        }
+
+        if (!first.getPdpSubgroup().equals(second.getPdpSubgroup())) {
+            return false;
+        }
+
+        // see if the other has any policies that this does not have
+        ArrayList<ToscaPolicy> lst = new ArrayList<>(second.getPolicies());
+        lst.removeAll(first.getPolicies());
+
+        return lst.isEmpty();
+    }
+
+    /**
+     * Determines if two requests are the "same", which is does not necessarily mean
+     * "equals".
+     *
+     * @param first first request to check
+     * @param second second request to check
+     * @return {@code true} if this update subsumes the other, {@code false} otherwise
+     */
+    protected static boolean isSame(PdpStateChange first, PdpStateChange second) {
+        return (first.getState() == second.getState());
+    }
+
+    /**
+     * Request data, which contains an UPDATE or a STATE-CHANGE request, or both. The
+     * UPDATE is always published before the STATE-CHANGE. In addition, both requests may
+     * be changed at any point, possibly triggering a restart of the publishing.
+     */
+    public class ModifyReqData extends RequestData {
+
+        /**
+         * The UPDATE message to be published, or {@code null}.
+         */
+        private PdpUpdate update;
+
+        /**
+         * The STATE-CHANGE message to be published, or {@code null}.
+         */
+        private PdpStateChange stateChange;
+
+
+        /**
+         * Constructs the object.
+         *
+         * @param newUpdate the UPDATE message to be sent, or {@code null}
+         * @param newState the STATE-CHANGE message to be sent, or {@code null}
+         */
+        public ModifyReqData(PdpUpdate newUpdate, PdpStateChange newState) {
+            super(params);
+
+            if (newUpdate != null) {
+                this.stateChange = newState;
+                setName(newUpdate.getName());
+                update = newUpdate;
+                configure(new ModUpdateData(newUpdate));
+
+            } else {
+                this.update = null;
+                setName(newState.getName());
+                stateChange = newState;
+                configure(new ModStateChangeData(newState));
+            }
+        }
+
+        /**
+         * Determines if this request is still in the map.
+         */
+        @Override
+        protected boolean isActive() {
+            return (name2data.get(getName()) == this);
+        }
+
+        /**
+         * Removes this request from the map.
+         */
+        @Override
+        protected void allCompleted() {
+            name2data.remove(getName(), this);
+        }
+
+        /**
+         * Adds an UPDATE to the request data, replacing any existing UPDATE, if
+         * appropriate. If the UPDATE is replaced, then publishing is restarted.
+         *
+         * @param newRequest the new UPDATE request
+         */
+        private void add(PdpUpdate newRequest) {
+            if (newRequest == null) {
+                return;
+            }
+
+            synchronized (modifyLock) {
+                if (update != null && isSame(update, newRequest)) {
+                    // already have this update - discard it
+                    return;
+                }
+
+                // must restart from scratch
+                stopPublishing();
+
+                update = newRequest;
+                configure(new ModUpdateData(newRequest));
+
+                startPublishing();
+            }
+        }
+
+        /**
+         * Adds a STATE-CHANGE to the request data, replacing any existing UPDATE, if
+         * appropriate. If the STATE-CHANGE is replaced, and we're currently publishing
+         * the STATE-CHANGE, then publishing is restarted.
+         *
+         * @param newRequest the new STATE-CHANGE request
+         */
+        private void add(PdpStateChange newRequest) {
+            if (newRequest == null) {
+                return;
+            }
+
+            synchronized (modifyLock) {
+                if (stateChange != null && isSame(stateChange, newRequest)) {
+                    // already have this update - discard it
+                    return;
+                }
+
+                if (getWrapper() instanceof StateChangeData) {
+                    // we were publishing STATE-CHANGE, thus must restart it
+                    stopPublishing();
+
+                    stateChange = newRequest;
+                    configure(new ModStateChangeData(newRequest));
+
+                    startPublishing();
+
+                } else {
+                    // haven't started publishing STATE-CHANGE yet, just replace it
+                    stateChange = newRequest;
+                }
+            }
+        }
+
+        /**
+         * Indicates that the retry count was exhausted.
+         */
+        protected void retryCountExhausted() {
+            // remove this request data from the PDP request map
+            allCompleted();
+
+            // TODO what to do?
+        }
+
+        /**
+         * Indicates that a response did not match the data.
+         *
+         * @param reason the reason for the mismatch
+         */
+        protected void mismatch(String reason) {
+            // remove this request data from the PDP request map
+            allCompleted();
+
+            // TODO what to do?
+        }
+
+        /**
+         * Wraps an UPDATE.
+         */
+        private class ModUpdateData extends UpdateData {
+
+            public ModUpdateData(PdpUpdate message) {
+                super(message, params);
+            }
+
+            @Override
+            public void mismatch(String reason) {
+                ModifyReqData.this.mismatch(reason);
+            }
+
+            @Override
+            public void completed() {
+                if (stateChange == null) {
+                    // no STATE-CHANGE request - we're done
+                    allCompleted();
+
+                } else {
+                    // now process the STATE-CHANGE request
+                    configure(new ModStateChangeData(stateChange));
+                    startPublishing();
+                }
+            }
+        }
+
+        /**
+         * Wraps a STATE-CHANGE.
+         */
+        private class ModStateChangeData extends StateChangeData {
+
+            public ModStateChangeData(PdpStateChange message) {
+                super(message, params);
+            }
+
+            @Override
+            public void mismatch(String reason) {
+                ModifyReqData.this.mismatch(reason);
+            }
+
+            @Override
+            public void completed() {
+                allCompleted();
+            }
+        }
+    }
+
+    // these may be overridden by junit tests
+
+    protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) {
+        return new ModifyReqData(update, stateChange);
+    }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java
new file mode 100644 (file)
index 0000000..6032d17
--- /dev/null
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.pap.main.PolicyPapException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Publishes messages to a topic. Maintains a queue of references to data that is to be
+ * published. Once the publisher removes a reference from a queue, it sets it to
+ * {@link null} to indicate that it is being processed. Until it has been set to
+ * {@link null}, clients are free to atomically update the reference to new values, thus
+ * maintaining their place in the queue.
+ *
+ * <p>This class has not been tested for multiple threads invoking {@link #run()}
+ * simultaneously.
+ */
+public class Publisher implements Runnable {
+    private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
+
+    /**
+     * Used to send to the topic.
+     */
+    private final TopicSinkClient client;
+
+    /**
+     * Request queue. The references may contain {@code null}.
+     */
+    private final BlockingQueue<QueueToken<PdpMessage>> queue = new LinkedBlockingQueue<>();
+
+    /**
+     * Set to {@code true} to cause the publisher to stop running.
+     */
+    private volatile boolean stopNow = false;
+
+    /**
+     * Constructs the object.
+     *
+     * @param topic name of the topic to which to publish
+     * @throws PolicyPapException if the topic sink does not exist
+     */
+    public Publisher(String topic) throws PolicyPapException {
+        try {
+            this.client = new TopicSinkClient(topic);
+        } catch (TopicSinkClientException e) {
+            throw new PolicyPapException(e);
+        }
+    }
+
+    /**
+     * Stops the publisher, if it's running.
+     */
+    public void stop() {
+        stopNow = true;
+
+        // add an empty reference so the thread doesn't block on the queue
+        queue.add(new QueueToken<>(null));
+    }
+
+    /**
+     * Adds an item to the queue. The referenced objects are assumed to be POJOs and will
+     * be converted to JSON via the {@link StandardCoder} prior to publishing.
+     *
+     * @param ref reference to the message to be published
+     */
+    public void enqueue(QueueToken<PdpMessage> ref) {
+        queue.add(ref);
+    }
+
+    /**
+     * Continuously publishes items in the queue until {@link #stop()} is invoked.
+     */
+    @Override
+    public void run() {
+        for (;;) {
+            QueueToken<PdpMessage> token = getNext();
+
+            if (stopNow) {
+                // unblock any other publisher threads
+                queue.offer(new QueueToken<>(null));
+                break;
+            }
+
+            PdpMessage data = token.replaceItem(null);
+            if (data == null) {
+                continue;
+            }
+
+            client.send(data);
+        }
+    }
+
+    /**
+     * Gets the next item from the queue. If the thread is interrupted, then it sets
+     * {@link #stopNow}.
+     *
+     * @return the next item, or a reference containing {@code null} if this is
+     *         interrupted
+     */
+    private QueueToken<PdpMessage> getNext() {
+        try {
+            return queue.take();
+
+        } catch (InterruptedException e) {
+            logger.warn("Publisher stopping due to interrupt");
+            stopNow = true;
+            Thread.currentThread().interrupt();
+            return new QueueToken<>(null);
+        }
+    }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java b/main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java
new file mode 100644 (file)
index 0000000..a68b7c0
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Token that can be placed within a publisher's queue. The item that a token references
+ * may be replaced any time up until it is set to {@code null}. Once it has been set to
+ * {@code null}, it cannot be replaced.
+ *
+ * @param <T> type of object referenced by the token
+ */
+public class QueueToken<T> {
+
+    /**
+     * Wraps the item.
+     */
+    private final AtomicReference<T> ref;
+
+    /**
+     * Constructs the object.
+     *
+     * @param item initial token item
+     */
+    public QueueToken(T item) {
+        ref = new AtomicReference<>(item);
+    }
+
+    /**
+     * Gets the item referenced by this token.
+     *
+     * @return the item referenced by this token
+     */
+    public final T get() {
+        return ref.get();
+    }
+
+    /**
+     * Replaces the token's item. If the current item is {@code null}, then it is left
+     * unchanged.
+     *
+     * @param newItem the new item
+     * @return the original item
+     */
+    public T replaceItem(T newItem) {
+        T oldItem;
+        while ((oldItem = ref.get()) != null) {
+            if (ref.compareAndSet(oldItem, newItem)) {
+                break;
+            }
+        }
+
+        // it was already null, or we successfully replaced the item
+        return oldItem;
+    }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java b/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java
new file mode 100644 (file)
index 0000000..29ad85b
--- /dev/null
@@ -0,0 +1,296 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.utils.services.ServiceManager;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.msgdata.MessageData;
+import org.onap.policy.pap.main.parameters.RequestDataParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Request data, the content of which may be changed at any point, possibly triggering a
+ * restart of the publishing.
+ */
+public abstract class RequestData {
+    private static final Logger logger = LoggerFactory.getLogger(RequestData.class);
+
+    /**
+     * Name with which this data is associated, used for logging purposes.
+     */
+    @Getter
+    @Setter(AccessLevel.PROTECTED)
+    private String name;
+
+    /**
+     * The configuration parameters.
+     */
+    private final RequestDataParams params;
+
+    /**
+     * Current retry count.
+     */
+    private int retryCount = 0;
+
+    /**
+     * Used to register/unregister the listener and the timer.
+     */
+    private ServiceManager svcmgr;
+
+    /**
+     * Wrapper for the message that is currently being published (i.e., {@link #update} or
+     * {@link #stateChange}.
+     */
+    @Getter(AccessLevel.PROTECTED)
+    private MessageData wrapper;
+
+    /**
+     * Used to cancel a timer.
+     */
+    private TimerManager.Timer timer;
+
+    /**
+     * Token that is placed on the queue.
+     */
+    private QueueToken<PdpMessage> token = null;
+
+
+    /**
+     * Constructs the object, and validates the parameters.
+     *
+     * @param params configuration parameters
+     *
+     * @throws IllegalArgumentException if a required parameter is not set
+     */
+    public RequestData(@NonNull RequestDataParams params) {
+        params.validate();
+
+        this.params = params;
+    }
+
+    /**
+     * Starts the publishing process, registering any listeners or timeout handlers, and
+     * adding the request to the publisher queue. This should not be invoked until after
+     * {@link #configure(MessageData)} is invoked.
+     */
+    public void startPublishing() {
+
+        synchronized (params.getModifyLock()) {
+            if (!svcmgr.isAlive()) {
+                svcmgr.start();
+            }
+        }
+    }
+
+    /**
+     * Unregisters the listener and cancels the timer.
+     */
+    protected void stopPublishing() {
+        if (svcmgr.isAlive()) {
+            svcmgr.stop();
+        }
+    }
+
+    /**
+     * Configures the fields based on the {@link #message} type.
+     *
+     * @param newWrapper the new message wrapper
+     */
+    protected void configure(MessageData newWrapper) {
+
+        wrapper = newWrapper;
+
+        resetRetryCount();
+
+        TimerManager timerManager = wrapper.getTimers();
+        String msgType = wrapper.getType();
+        String reqid = wrapper.getMessage().getRequestId();
+
+        /*
+         * We have to configure the service manager HERE, because it's name changes if the
+         * message class changes.
+         */
+
+        // @formatter:off
+        this.svcmgr = new ServiceManager(name + " " + msgType)
+                        .addAction("listener",
+                            () -> params.getResponseDispatcher().register(reqid, this::processResponse),
+                            () -> params.getResponseDispatcher().unregister(reqid))
+                        .addAction("timer",
+                            () -> timer = timerManager.register(name, this::handleTimeout),
+                            () -> timer.cancel())
+                        .addAction("enqueue",
+                            () -> enqueue(),
+                            () -> {
+                                // nothing to "stop"
+                            });
+        // @formatter:on
+    }
+
+    /**
+     * Enqueues the current message with the publisher, putting it into the queue token,
+     * if possible. Otherwise, it adds a new token to the queue.
+     */
+    private void enqueue() {
+        PdpMessage message = wrapper.getMessage();
+        if (token != null && token.replaceItem(message) != null) {
+            // took the other's place in the queue - continue using the token
+            return;
+        }
+
+        // couldn't take the other's place - add our own token to the queue
+        token = new QueueToken<>(message);
+        params.getPublisher().enqueue(token);
+    }
+
+    /**
+     * Resets the retry count.
+     */
+    protected void resetRetryCount() {
+        retryCount = 0;
+    }
+
+    /**
+     * Bumps the retry count.
+     *
+     * @return {@code true} if successful, {@code false} if the limit has been reached
+     */
+    protected boolean bumpRetryCount() {
+        if (retryCount >= wrapper.getMaxRetryCount()) {
+            return false;
+        }
+
+        retryCount++;
+        return true;
+    }
+
+    /**
+     * Indicates that the retry count was exhausted. The default method simply invokes
+     * {@link #allCompleted()}.
+     */
+    protected void retryCountExhausted() {
+        // remove this request data from the PDP request map
+        allCompleted();
+    }
+
+    /**
+     * Processes a response received from the PDP.
+     *
+     * @param infra infrastructure on which the response was received
+     * @param topic topic on which the response was received
+     * @param response the response
+     */
+    private void processResponse(CommInfrastructure infra, String topic, PdpStatus response) {
+
+        synchronized (params.getModifyLock()) {
+            if (!svcmgr.isAlive()) {
+                // this particular request must have been discarded
+                return;
+            }
+
+            stopPublishing();
+
+            if (!isActive()) {
+                return;
+            }
+
+            String reason = wrapper.checkResponse(response);
+            if (reason != null) {
+                logger.info("{} PDP data mismatch: {}", getName(), reason);
+                wrapper.mismatch(reason);
+
+            } else {
+                logger.info("{} {} successful", getName(), wrapper.getType());
+                wrapper.completed();
+            }
+        }
+    }
+
+    /**
+     * Handles a timeout.
+     *
+     * @param timerName the timer timer
+     */
+    private void handleTimeout(String timerName) {
+
+        synchronized (params.getModifyLock()) {
+            if (!svcmgr.isAlive()) {
+                // this particular request must have been discarded
+                return;
+            }
+
+            stopPublishing();
+
+            if (!isActive()) {
+                return;
+            }
+
+            if (isInQueue()) {
+                // haven't published yet - just leave it in the queue and reset counts
+                logger.info("{} timeout - request still in the queue", getName());
+                resetRetryCount();
+                startPublishing();
+                return;
+            }
+
+            if (!bumpRetryCount()) {
+                logger.info("{} timeout - retry count exhausted", getName());
+                retryCountExhausted();
+                return;
+            }
+
+            // re-publish
+            logger.info("{} timeout - re-publish", getName());
+            startPublishing();
+        }
+    }
+
+    /**
+     * Determines if the current message is still in the queue. Assumes that
+     * {@link #startPublishing()} has been invoked and thus {@link #token} has been
+     * initialized.
+     *
+     * @return {@code true} if the current message is in the queue, {@code false}
+     *         otherwise
+     */
+    private boolean isInQueue() {
+        return (token.get() == wrapper.getMessage());
+    }
+
+    /**
+     * Determines if this request data is still active.
+     *
+     * @return {@code true} if this request is active, {@code false} otherwise
+     */
+    protected abstract boolean isActive();
+
+    /**
+     * Indicates that this entire request has completed.
+     */
+    protected abstract void allCompleted();
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java b/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java
new file mode 100644 (file)
index 0000000..9748b0b
--- /dev/null
@@ -0,0 +1,310 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager of timers. All of the timers for a given manager have the same wait time, which
+ * makes it possible to use a linked hash map to track the timers. As a result, timers can
+ * be quickly added and removed. In addition, the expiration time of any new timer is
+ * always greater than or equal to the timers that are already in the map. Consequently,
+ * the map's iterator will go in ascending order from the minimum expiration time to
+ * maximum expiration time.
+ *
+ * <p>This class has not been tested for multiple threads invoking {@link #run()}
+ * simultaneously.
+ */
+public class TimerManager implements Runnable {
+    private static final Logger logger = LoggerFactory.getLogger(TimerManager.class);
+
+    /**
+     * Name of this manager, used for logging purposes.
+     */
+    private final String name;
+
+    /**
+     * Time that each new timer should wait.
+     */
+    private final long waitTimeMs;
+
+    /**
+     * When the map is empty, the timer thread will block waiting for this semaphore. When
+     * a new timer is added to the map, the semaphore will be released, thus allowing the
+     * timer thread to progress.
+     */
+    private final Semaphore sem = new Semaphore(0);
+
+    /**
+     * This is decremented to indicate that this manager should be stopped.
+     */
+    private final CountDownLatch stopper = new CountDownLatch(1);
+
+    /**
+     * Used to lock updates to the map.
+     */
+    private final Object lockit = new Object();
+
+    /**
+     * Maps a timer name to a timer.
+     */
+    private final Map<String, Timer> name2timer = new LinkedHashMap<>();
+
+    /**
+     * Constructs the object.
+     *
+     * @param name name of this manager, used for logging purposes
+     * @param waitTimeMs time that each new timer should wait
+     */
+    public TimerManager(String name, long waitTimeMs) {
+        this.name = name;
+        this.waitTimeMs = waitTimeMs;
+    }
+
+    /**
+     * Stops the timer thread.
+     */
+    public void stop() {
+        logger.info("timer manager {} stopping", name);
+
+        // Note: Must decrement the latch BEFORE releasing the semaphore
+        stopper.countDown();
+        sem.release();
+    }
+
+    /**
+     * Registers a timer with the given name. When the timer expires, it is automatically
+     * unregistered and then executed.
+     *
+     * @param timerName name of the timer to register
+     * @param action action to take when the timer expires; the "timerName" is passed as
+     *        the only argument
+     * @return the timer
+     */
+    public Timer register(String timerName, Consumer<String> action) {
+
+        synchronized (lockit) {
+            Timer timer = new Timer(timerName, action);
+
+            // always remove existing entry so that new entry goes at the end of the map
+            name2timer.remove(timerName);
+            name2timer.put(timerName, timer);
+
+            logger.info("{} timer registered {}", name, timer);
+
+            if (name2timer.size() == 1) {
+                // release the timer thread
+                sem.release();
+            }
+
+            return timer;
+        }
+    }
+
+    /**
+     * Continuously processes timers until {@link #stop()} is invoked.
+     */
+    @Override
+    public void run() {
+        logger.info("timer manager {} started", name);
+
+        while (stopper.getCount() > 0) {
+
+            try {
+                sem.acquire();
+                sem.drainPermits();
+
+                processTimers();
+
+            } catch (InterruptedException e) {
+                logger.warn("timer manager {} stopping due to interrupt", name);
+                stopper.countDown();
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        logger.info("timer manager {} stopped", name);
+    }
+
+    /**
+     * Process all timers, continuously, as long as a timer remains in the map (and
+     * {@link #stop()} has not been called).
+     *
+     * @throws InterruptedException if the thread is interrupted
+     */
+    private void processTimers() throws InterruptedException {
+        Timer timer;
+        while ((timer = getNextTimer()) != null && stopper.getCount() > 0) {
+            processTimer(timer);
+        }
+    }
+
+    /**
+     * Gets the timer that will expire first.
+     *
+     * @return the timer that will expire first, or {@code null} if there are no timers
+     */
+    private Timer getNextTimer() {
+
+        synchronized (lockit) {
+            if (name2timer.isEmpty()) {
+                return null;
+            }
+
+            // use an iterator to get the first timer in the map
+            return name2timer.values().iterator().next();
+        }
+    }
+
+    /**
+     * Process a timer, waiting until it expires, unregistering it, and then executing its
+     * action.
+     *
+     * @param timer timer to process
+     * @throws InterruptedException if the thread is interrupted
+     */
+    private void processTimer(Timer timer) throws InterruptedException {
+        timer.await();
+
+        if (stopper.getCount() == 0) {
+            // stop() was called
+            return;
+        }
+
+        if (!timer.cancel()) {
+            // timer was cancelled while we were waiting
+            return;
+        }
+
+
+        // run the timer
+        try {
+            logger.info("{} timer expired {}", TimerManager.this.name, timer);
+            timer.runner.accept(timer.name);
+        } catch (RuntimeException e) {
+            logger.warn("{} timer threw an exception {}", TimerManager.this.name, timer, e);
+        }
+    }
+
+    /**
+     * Timer info.
+     */
+    public class Timer {
+        /**
+         * The timer's name.
+         */
+        private String name;
+
+        /**
+         * Time, in milliseconds, when the timer will expire.
+         */
+        private long expireMs;
+
+        /**
+         * Action to take when the timer expires.
+         */
+        private Consumer<String> runner;
+
+
+        private Timer(String name, Consumer<String> runner2) {
+            this.name = name;
+            this.expireMs = waitTimeMs + currentTimeMillis();
+            this.runner = runner2;
+        }
+
+        private void await() throws InterruptedException {
+            // wait for it to expire, if necessary
+            long tleft = expireMs - currentTimeMillis();
+            if (tleft > 0) {
+                logger.info("{} timer waiting {}ms {}", TimerManager.this.name, tleft, this);
+                sleep(tleft);
+            }
+        }
+
+        /**
+         * Cancels the timer.
+         *
+         * @return {@code true} if the timer was cancelled, {@code false} if the timer was
+         *         not running
+         */
+        public boolean cancel() {
+
+            AtomicBoolean wasPresent = new AtomicBoolean(false);
+
+            synchronized (lockit) {
+
+                name2timer.computeIfPresent(name, (key, val) -> {
+
+                    if (val == this) {
+                        wasPresent.set(true);
+                        return null;
+
+                    } else {
+                        return val;
+                    }
+                });
+
+                if (!wasPresent.get()) {
+                    // have a new timer in the map - ignore "this" timer
+                    logger.info("{} timer replaced {}", TimerManager.this.name, this);
+                    return false;
+                }
+
+                logger.debug("{} timer cancelled {}", TimerManager.this.name, this);
+                return true;
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Timer [name=" + name + ", expireMs=" + expireMs + "]";
+        }
+    }
+
+    // these may be overridden by junit tests
+
+    /**
+     * Gets the current time, in milli-seconds.
+     *
+     * @return the current time, in milli-seconds
+     */
+    protected long currentTimeMillis() {
+        return System.currentTimeMillis();
+    }
+
+    /**
+     * "Sleeps" for a bit, stopping if {@link #stop()} is invoked.
+     *
+     * @param timeMs time, in milli-seconds, to sleep
+     * @throws InterruptedException if this thread is interrupted while sleeping
+     */
+    protected void sleep(long timeMs) throws InterruptedException {
+        stopper.await(timeMs, TimeUnit.MILLISECONDS);
+    }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java
new file mode 100644 (file)
index 0000000..aa288f7
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.TimerManager;
+
+
+/**
+ * Wraps a message, providing methods appropriate to the message type.
+ */
+public abstract class MessageData {
+    private final PdpMessage message;
+    private final int maxRetries;
+    private final TimerManager timers;
+
+    /**
+     * Constructs the object.
+     *
+     * @param message message to be wrapped by this
+     * @param maxRetries max number of retries
+     * @param timers the timer manager for messages of this type
+     */
+    public MessageData(PdpMessage message, int maxRetries, TimerManager timers) {
+        this.message = message;
+        this.maxRetries = maxRetries;
+        this.timers = timers;
+    }
+
+    /**
+     * Gets the wrapped message.
+     *
+     * @return the wrapped message
+     */
+    public PdpMessage getMessage() {
+        return message;
+    }
+
+    /**
+     * Gets a string, suitable for logging, identifying the message type.
+     *
+     * @return the message type
+     */
+    public String getType() {
+        return message.getClass().getSimpleName();
+    }
+
+    /**
+     * Gets the maximum retry count for the particular message type.
+     *
+     * @return the maximum retry count
+     */
+    public int getMaxRetryCount() {
+        return maxRetries;
+    }
+
+    /**
+     * Gets the timer manager for the particular message type.
+     *
+     * @return the timer manager
+     */
+    public TimerManager getTimers() {
+        return timers;
+    }
+
+    /**
+     * Indicates that the response did not match what was expected.
+     *
+     * @param reason the reason for the mismatch
+     */
+    public abstract void mismatch(String reason);
+
+    /**
+     * Indicates that processing of this particular message has completed successfully.
+     */
+    public abstract void completed();
+
+    /**
+     * Checks the response to ensure it is as expected.
+     *
+     * @param response the response to check
+     * @return an error message, if a fatal error has occurred, {@code null} otherwise
+     */
+    public abstract String checkResponse(PdpStatus response);
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java
new file mode 100644 (file)
index 0000000..ecbf5df
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+
+/**
+ * Wraps a STATE-CHANGE.
+ */
+public abstract class StateChangeData extends MessageData {
+    private PdpStateChange stateChange;
+
+    /**
+     * Constructs the object.
+     *
+     * @param message message to be wrapped by this
+     * @param params the parameters
+     */
+    public StateChangeData(PdpStateChange message, PdpModifyRequestMapParams params) {
+        super(message, params.getParams().getStateChangeParameters().getMaxRetryCount(), params.getStateChangeTimers());
+
+        stateChange = message;
+    }
+
+    @Override
+    public String checkResponse(PdpStatus response) {
+        if (!stateChange.getName().equals(response.getName())) {
+            return "name does not match";
+        }
+
+        if (response.getState() != stateChange.getState()) {
+            return "state is " + response.getState() + ", but expected " + stateChange.getState();
+        }
+
+        return null;
+    }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java
new file mode 100644 (file)
index 0000000..904c823
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+
+
+/**
+ * Wraps an UPDATE.
+ */
+public abstract class UpdateData extends MessageData {
+    private PdpUpdate update;
+
+    /**
+     * Constructs the object.
+     *
+     * @param message message to be wrapped by this
+     * @param params the parameters
+     */
+    public UpdateData(PdpUpdate message, PdpModifyRequestMapParams params) {
+        super(message, params.getParams().getUpdateParameters().getMaxRetryCount(), params.getUpdateTimers());
+
+        update = message;
+    }
+
+    @Override
+    public String checkResponse(PdpStatus response) {
+        if (!update.getName().equals(response.getName())) {
+            return "name does not match";
+        }
+
+        if (!update.getPdpGroup().equals(response.getPdpGroup())) {
+            return "group does not match";
+        }
+
+        if (!update.getPdpSubgroup().equals(response.getPdpSubgroup())) {
+            return "subgroup does not match";
+        }
+
+        // see if the other has any policies that this does not have
+        ArrayList<ToscaPolicy> lst = new ArrayList<>(response.getPolicies());
+        List<ToscaPolicy> mypolicies = update.getPolicies();
+
+        if (mypolicies.size() != lst.size()) {
+            return "policies do not match";
+        }
+
+        lst.removeAll(update.getPolicies());
+        if (!lst.isEmpty()) {
+            return "policies do not match";
+        }
+
+        return null;
+    }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
new file mode 100644 (file)
index 0000000..2c17a0b
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.parameters;
+
+import lombok.Getter;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.TimerManager;
+
+
+/**
+ * Parameters needed to create a {@link PdpModifyRequestMapParams}.
+ */
+@Getter
+public class PdpModifyRequestMapParams extends RequestDataParams {
+    private PdpParameters params;
+    private TimerManager updateTimers;
+    private TimerManager stateChangeTimers;
+
+    public PdpModifyRequestMapParams setParams(PdpParameters params) {
+        this.params = params;
+        return this;
+    }
+
+    public PdpModifyRequestMapParams setUpdateTimers(TimerManager updateTimers) {
+        this.updateTimers = updateTimers;
+        return this;
+    }
+
+    public PdpModifyRequestMapParams setStateChangeTimers(TimerManager stateChangeTimers) {
+        this.stateChangeTimers = stateChangeTimers;
+        return this;
+    }
+
+    @Override
+    public PdpModifyRequestMapParams setPublisher(Publisher publisher) {
+        super.setPublisher(publisher);
+        return this;
+    }
+
+    @Override
+    public PdpModifyRequestMapParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) {
+        super.setResponseDispatcher(responseDispatcher);
+        return this;
+    }
+
+    @Override
+    public PdpModifyRequestMapParams setModifyLock(Object modifyLock) {
+        super.setModifyLock(modifyLock);
+        return this;
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+
+        if (params == null) {
+            throw new IllegalArgumentException("missing PDP parameters");
+        }
+
+        if (updateTimers == null) {
+            throw new IllegalArgumentException("missing updateTimers");
+        }
+
+        if (stateChangeTimers == null) {
+            throw new IllegalArgumentException("missing stateChangeTimers");
+        }
+    }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java
new file mode 100644 (file)
index 0000000..ea4b02c
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.parameters;
+
+import lombok.Getter;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.RequestData;
+
+
+/**
+ * Parameters needed to create a {@link RequestData}.
+ */
+@Getter
+public class RequestDataParams {
+    private Publisher publisher;
+    private RequestIdDispatcher<PdpStatus> responseDispatcher;
+    private Object modifyLock;
+
+    public RequestDataParams setPublisher(Publisher publisher) {
+        this.publisher = publisher;
+        return this;
+    }
+
+    public RequestDataParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) {
+        this.responseDispatcher = responseDispatcher;
+        return this;
+    }
+
+    public RequestDataParams setModifyLock(Object modifyLock) {
+        this.modifyLock = modifyLock;
+        return this;
+    }
+
+    /**
+     * Validates the parameters.
+     */
+    public void validate() {
+        if (publisher == null) {
+            throw new IllegalArgumentException("missing publisher");
+        }
+
+        if (responseDispatcher == null) {
+            throw new IllegalArgumentException("missing responseDispatcher");
+        }
+
+        if (modifyLock == null) {
+            throw new IllegalArgumentException("missing modifyLock");
+        }
+    }
+}
index 913e661..ed880ae 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.policy.pap.main.startstop;
 
 import java.util.Arrays;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
@@ -34,7 +35,12 @@ import org.onap.policy.models.pdp.concepts.PdpStatus;
 import org.onap.policy.models.pdp.enums.PdpMessageType;
 import org.onap.policy.pap.main.PapConstants;
 import org.onap.policy.pap.main.PolicyPapRuntimeException;
+import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.TimerManager;
 import org.onap.policy.pap.main.parameters.PapParameterGroup;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.PdpParameters;
 import org.onap.policy.pap.main.rest.PapRestServer;
 import org.onap.policy.pap.main.rest.PapStatisticsManager;
 
@@ -81,8 +87,6 @@ public class PapActivator extends ServiceManagerContainer {
 
         try {
             this.papParameterGroup = papParameterGroup;
-            papParameterGroup.getRestServerParameters().setName(papParameterGroup.getName());
-
             this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
             this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
 
@@ -90,18 +94,26 @@ public class PapActivator extends ServiceManagerContainer {
             throw new PolicyPapRuntimeException(e);
         }
 
-        this.msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher);
+        papParameterGroup.getRestServerParameters().setName(papParameterGroup.getName());
 
         final Object pdpUpdateLock = new Object();
+        PdpParameters pdpParams = papParameterGroup.getPdpParameters();
+        AtomicReference<Publisher> pdpPub = new AtomicReference<>();
+        AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>();
+        AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>();
 
         // @formatter:off
         addAction("PAP parameters",
             () -> ParameterService.register(papParameterGroup),
             () -> ParameterService.deregister(papParameterGroup.getName()));
 
-        addAction("dispatcher",
-            () -> registerDispatcher(),
-            () -> unregisterDispatcher());
+        addAction("Request ID Dispatcher",
+            () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher),
+            () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));
+
+        addAction("Message Dispatcher",
+            () -> registerMsgDispatcher(),
+            () -> unregisterMsgDispatcher());
 
         addAction("topics",
             () -> TopicEndpoint.manager.start(),
@@ -111,20 +123,68 @@ public class PapActivator extends ServiceManagerContainer {
             () -> Registry.register(PapConstants.REG_STATISTICS_MANAGER, new PapStatisticsManager()),
             () -> Registry.unregister(PapConstants.REG_STATISTICS_MANAGER));
 
+        addAction("PDP publisher",
+            () -> {
+                pdpPub.set(new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP));
+                startThread(pdpPub.get());
+            },
+            () -> pdpPub.get().stop());
+
+        addAction("PDP update timers",
+            () -> {
+                pdpUpdTimers.set(new TimerManager("update", pdpParams.getUpdateParameters().getMaxWaitMs()));
+                startThread(pdpUpdTimers.get());
+            },
+            () -> pdpUpdTimers.get().stop());
+
+        addAction("PDP state-change timers",
+            () -> {
+                pdpStChgTimers.set(new TimerManager("state-change", pdpParams.getUpdateParameters().getMaxWaitMs()));
+                startThread(pdpStChgTimers.get());
+            },
+            () -> pdpStChgTimers.get().stop());
+
         addAction("PDP modification lock",
             () -> Registry.register(PapConstants.REG_PDP_MODIFY_LOCK, pdpUpdateLock),
             () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_LOCK));
 
-        addAction("REST server",
-            () -> restServer = new PapRestServer(papParameterGroup.getRestServerParameters()),
-            () -> { });
+        addAction("PDP modification requests",
+            () -> Registry.register(PapConstants.REG_PDP_MODIFY_MAP, new PdpModifyRequestMap(
+                            new PdpModifyRequestMapParams()
+                                    .setModifyLock(pdpUpdateLock)
+                                    .setParams(pdpParams)
+                                    .setPublisher(pdpPub.get())
+                                    .setResponseDispatcher(reqIdDispatcher)
+                                    .setStateChangeTimers(pdpStChgTimers.get())
+                                    .setUpdateTimers(pdpUpdTimers.get()))),
+            () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_MAP));
+
+        addAction("Create REST server",
+            () -> {
+                restServer = new PapRestServer(papParameterGroup.getRestServerParameters());
+            },
+            () -> {
+                restServer = null;
+            });
 
-        addAction("REST server thread",
+        addAction("REST server",
             () -> restServer.start(),
             () -> restServer.stop());
         // @formatter:on
     }
 
+    /**
+     * Starts a background thread.
+     *
+     * @param runner function to run in the background
+     */
+    private void startThread(Runnable runner) {
+        Thread thread = new Thread(runner);
+        thread.setDaemon(true);
+
+        thread.start();
+    }
+
     /**
      * Get the parameters used by the activator.
      *
@@ -137,7 +197,7 @@ public class PapActivator extends ServiceManagerContainer {
     /**
      * Registers the dispatcher with the topic source(s).
      */
-    private void registerDispatcher() {
+    private void registerMsgDispatcher() {
         for (TopicSource source : TopicEndpoint.manager
                         .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) {
             source.register(msgDispatcher);
@@ -147,7 +207,7 @@ public class PapActivator extends ServiceManagerContainer {
     /**
      * Unregisters the dispatcher from the topic source(s).
      */
-    private void unregisterDispatcher() {
+    private void unregisterMsgDispatcher() {
         for (TopicSource source : TopicEndpoint.manager
                         .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) {
             source.unregister(msgDispatcher);
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java
new file mode 100644 (file)
index 0000000..bbe75a4
--- /dev/null
@@ -0,0 +1,575 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
+import org.onap.policy.models.base.PfConceptKey;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy;
+import org.onap.policy.pap.main.PapConstants;
+import org.onap.policy.pap.main.comm.PdpModifyRequestMap.ModifyReqData;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.PdpParameters;
+import org.onap.policy.pap.main.parameters.PdpStateChangeParameters;
+import org.onap.policy.pap.main.parameters.PdpUpdateParameters;
+import org.powermock.reflect.Whitebox;
+
+public class PdpModifyRequestMapTest {
+    private static final String DIFFERENT = "-diff";
+    private static final String PDP1 = "pdp_1";
+
+    private static final int UPDATE_RETRIES = 2;
+    private static final int STATE_RETRIES = 1;
+
+    private PdpModifyRequestMap map;
+    private Publisher pub;
+    private RequestIdDispatcher<PdpStatus> disp;
+    private Object lock;
+    private TimerManager updTimers;
+    private TimerManager stateTimers;
+    private TimerManager.Timer timer;
+    private Queue<QueueToken<PdpMessage>> queue;
+    private PdpStatus response;
+    private PdpParameters pdpParams;
+    private PdpUpdateParameters updParams;
+    private PdpStateChangeParameters stateParams;
+    private PdpUpdate update;
+    private PdpStateChange state;
+    private String mismatchReason;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setUp() {
+        pub = mock(Publisher.class);
+        disp = mock(RequestIdDispatcher.class);
+        lock = new Object();
+        updTimers = mock(TimerManager.class);
+        stateTimers = mock(TimerManager.class);
+        timer = mock(TimerManager.Timer.class);
+        queue = new LinkedList<>();
+        response = new PdpStatus();
+        pdpParams = mock(PdpParameters.class);
+        updParams = mock(PdpUpdateParameters.class);
+        stateParams = mock(PdpStateChangeParameters.class);
+        update = makeUpdate();
+        state = makeStateChange();
+        mismatchReason = null;
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                queue.add(invocation.getArgumentAt(0, QueueToken.class));
+                return null;
+            }
+        }).when(pub).enqueue(any());
+
+        when(updTimers.register(any(), any())).thenReturn(timer);
+        when(stateTimers.register(any(), any())).thenReturn(timer);
+
+        when(pdpParams.getUpdateParameters()).thenReturn(updParams);
+        when(pdpParams.getStateChangeParameters()).thenReturn(stateParams);
+
+        when(updParams.getMaxRetryCount()).thenReturn(UPDATE_RETRIES);
+        when(updParams.getMaxWaitMs()).thenReturn(1000L);
+
+        when(stateParams.getMaxRetryCount()).thenReturn(STATE_RETRIES);
+        when(stateParams.getMaxWaitMs()).thenReturn(1000L);
+
+        response.setName(PDP1);
+        response.setState(PdpState.SAFE);
+        response.setPdpGroup(update.getPdpGroup());
+        response.setPdpSubgroup(update.getPdpSubgroup());
+        response.setPolicies(update.getPolicies());
+
+        map = new PdpModifyRequestMap(makeParameters()) {
+
+            @Override
+            protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) {
+                return new ModifyReqData(update, stateChange) {
+                    @Override
+                    protected void mismatch(String reason) {
+                        mismatchReason = reason;
+                        super.mismatch(reason);
+                    }
+                };
+            }
+        };
+
+        map = spy(map);
+    }
+
+    @Test
+    public void testAdd_DifferentPdps() {
+        map.addRequest(update);
+
+        state.setName(DIFFERENT);
+        map.addRequest(state);
+
+        assertNotNull(getReqData(PDP1));
+        assertNotNull(getReqData(DIFFERENT));
+
+        assertQueueContains("testAdd_DifferentPdps", update, state);
+    }
+
+    @Test
+    public void testAddRequestPdpUpdate() {
+        map.addRequest(update);
+
+        assertQueueContains("testAddRequestPdpUpdate", update);
+    }
+
+    @Test
+    public void testAddRequestPdpStateChange() {
+        map.addRequest(state);
+
+        assertQueueContains("testAddRequestPdpStateChange", state);
+    }
+
+    @Test
+    public void testAddRequestPdpUpdatePdpStateChange_Both() {
+        map.addRequest(update, state);
+
+        assertQueueContains("testAddRequestPdpUpdatePdpStateChange_Both", update);
+    }
+
+    @Test
+    public void testAddRequestPdpUpdatePdpStateChange_BothNull() {
+        map.addRequest(null, null);
+
+        // nothing should have been added to the queue
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testGetPdpName_SameNames() {
+        // should be no exception
+        map.addRequest(update, state);
+    }
+
+    @Test
+    public void testGetPdpName_DifferentNames() {
+        // should be no exception
+        state.setName(update.getName() + "X");
+        assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state))
+                        .withMessageContaining("does not match");
+    }
+
+    @Test
+    public void testGetPdpName_NullUpdateName() {
+        update.setName(null);
+        assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update)).withMessageContaining("update");
+
+        assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state))
+                        .withMessageContaining("update");
+
+        // both names are null
+        state.setName(null);
+        assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state));
+    }
+
+    @Test
+    public void testGetPdpName_NullStateName() {
+        state.setName(null);
+        assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(state)).withMessageContaining("state");
+
+        assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state))
+                        .withMessageContaining("state");
+
+        // both names are null
+        update.setName(null);
+        assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state));
+    }
+
+    @Test
+    public void testIsSamePdpUpdatePdpUpdate() {
+        map.addRequest(update);
+
+        // queue a similar request
+        PdpUpdate update2 = makeUpdate();
+        map.addRequest(update2);
+
+        // token should still have original message
+        assertQueueContains("testIsSamePdpUpdatePdpUpdate", update);
+    }
+
+    @Test
+    public void testIsSamePdpUpdatePdpUpdate_DifferentPolicyCount() {
+        map.addRequest(update);
+
+        PdpUpdate update2 = makeUpdate();
+        update2.setPolicies(Arrays.asList(update.getPolicies().get(0)));
+        map.addRequest(update2);
+
+        // should have replaced the message in the token
+        assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentPolicyCount", update2);
+    }
+
+    @Test
+    public void testIsSamePdpUpdatePdpUpdate_DifferentGroup() {
+        map.addRequest(update);
+
+        // queue a similar request
+        PdpUpdate update2 = makeUpdate();
+        update2.setPdpGroup(update.getPdpGroup() + DIFFERENT);
+        map.addRequest(update2);
+
+        // should have replaced the message in the token
+        assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentGroup", update2);
+    }
+
+    @Test
+    public void testIsSamePdpUpdatePdpUpdate_DifferentSubGroup() {
+        map.addRequest(update);
+
+        PdpUpdate update2 = makeUpdate();
+        update2.setPdpSubgroup(update.getPdpSubgroup() + DIFFERENT);
+        map.addRequest(update2);
+
+        // should have replaced the message in the token
+        assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentSubGroup", update2);
+    }
+
+    @Test
+    public void testIsSamePdpUpdatePdpUpdate_DifferentPolicies() {
+        map.addRequest(update);
+
+        ArrayList<ToscaPolicy> policies = new ArrayList<>(update.getPolicies());
+        policies.set(0, new ToscaPolicy(new PfConceptKey("policy-3-x", "2.0.0")));
+
+        PdpUpdate update2 = makeUpdate();
+        update2.setPolicies(policies);
+        map.addRequest(update2);
+
+        // should have replaced the message in the token
+        assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentPolicies", update2);
+    }
+
+    @Test
+    public void testIsSamePdpStateChangePdpStateChange() {
+        map.addRequest(state);
+
+        // queue a similar request
+        PdpStateChange state2 = makeStateChange();
+        map.addRequest(state2);
+
+        // token should still have original message
+        assertQueueContains("testIsSamePdpStateChangePdpStateChange", state);
+    }
+
+    @Test
+    public void testIsSamePdpStateChangePdpStateChange_DifferentState() {
+        map.addRequest(state);
+
+        // queue a similar request
+        PdpStateChange state2 = makeStateChange();
+        state2.setState(PdpState.TERMINATED);
+        map.addRequest(state2);
+
+        // should have replaced the message in the token
+        assertQueueContains("testIsSamePdpStateChangePdpStateChange_DifferentState", state2);
+    }
+
+    @Test
+    public void testModifyReqDataIsActive() {
+        map.addRequest(update);
+
+        invokeProcessResponse();
+
+        // name should have been removed
+        assertNull(getReqData(PDP1));
+    }
+
+    @Test
+    public void testModifyReqDataAddPdpUpdate() {
+        map.addRequest(state);
+
+        map.addRequest(update);
+
+        // update should have replaced the state-change in the queue
+        assertQueueContains("testModifyReqDataAddPdpUpdate", update);
+    }
+
+    @Test
+    public void testModifyReqDataAddPdpStateChange() {
+        map.addRequest(update);
+
+        map.addRequest(state);
+
+        // update should still be in the queue
+        assertQueueContains("testModifyReqDataAddPdpStateChange", update);
+    }
+
+    @Test
+    public void testModifyReqDataRetryCountExhausted() {
+        map.addRequest(state);
+
+        // timeout twice so that retry count is exhausted
+        invokeTimeoutHandler(stateTimers, STATE_RETRIES + 1);
+
+        // name should have been removed
+        assertNull(getReqData(PDP1));
+    }
+
+    @Test
+    public void testModifyReqDataMismatch() {
+        map.addRequest(state);
+
+        // set up a response with incorrect info
+        response.setName(state.getName() + DIFFERENT);
+
+        invokeProcessResponse();
+
+        assertNotNull(mismatchReason);
+
+        // name should have been removed
+        assertNull(getReqData(PDP1));
+    }
+
+    @Test
+    public void testUpdateDataGetMaxRetryCount() {
+        map.addRequest(update);
+        ModifyReqData reqdata = getReqData(PDP1);
+
+        for (int count = 0; count < UPDATE_RETRIES; ++count) {
+            assertTrue("update bump " + count, reqdata.bumpRetryCount());
+        }
+
+        assertFalse("update bump final", reqdata.bumpRetryCount());
+    }
+
+    @Test
+    public void testUpdateDataMismatch() {
+        map.addRequest(update);
+
+        response.setName(DIFFERENT);
+        invokeProcessResponse();
+
+        assertNull(getReqData(PDP1));
+    }
+
+    @Test
+    public void testUpdateDataComplete() {
+        map.addRequest(update);
+
+        invokeProcessResponse();
+
+        assertNull(getReqData(PDP1));
+    }
+
+    @Test
+    public void testUpdateDataComplete_MoreToGo() {
+        map.addRequest(update, state);
+
+        invokeProcessResponse();
+
+        assertNotNull(getReqData(PDP1));
+
+        assertSame(state, queue.poll().get());
+    }
+
+    @Test
+    public void testStateChangeDataMismatch() {
+        map.addRequest(state);
+
+        response.setName(DIFFERENT);
+        invokeProcessResponse();
+
+        assertNull(getReqData(PDP1));
+    }
+
+    @Test
+    public void testStateChangeDataCompleted() {
+        map.addRequest(state);
+
+        invokeProcessResponse();
+
+        assertNull(getReqData(PDP1));
+    }
+
+    @Test
+    public void testMakeRequestData() {
+        // need a map that doesn't override the method
+        map = new PdpModifyRequestMap(makeParameters());
+
+        // this will invoke makeRequestData() - should not throw an exception
+        map.addRequest(update);
+
+        assertNotNull(getReqData(PDP1));
+    }
+
+    /**
+     * Asserts that the queue contains the specified messages.
+     *
+     * @param testName the test name
+     * @param messages messages that are expected in the queue
+     */
+    private void assertQueueContains(String testName, PdpMessage... messages) {
+        assertEquals(testName, messages.length, queue.size());
+
+        int count = 0;
+        for (PdpMessage msg : messages) {
+            ++count;
+
+            QueueToken<PdpMessage> token = queue.remove();
+            assertSame(testName + "-" + count, msg, token.get());
+        }
+    }
+
+    /**
+     * Makes parameters to configure a map.
+     *
+     * @return new parameters
+     */
+    private PdpModifyRequestMapParams makeParameters() {
+        return new PdpModifyRequestMapParams().setModifyLock(lock).setParams(pdpParams).setPublisher(pub)
+                        .setResponseDispatcher(disp).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers);
+    }
+
+    /**
+     * Gets the listener that was registered with the dispatcher and invokes it.
+     *
+     * @return the response processor
+     */
+    @SuppressWarnings("unchecked")
+    private TypedMessageListener<PdpStatus> invokeProcessResponse() {
+        @SuppressWarnings("rawtypes")
+        ArgumentCaptor<TypedMessageListener> processResp = ArgumentCaptor.forClass(TypedMessageListener.class);
+
+        // indicate that is has been published
+        queue.remove().replaceItem(null);
+
+        verify(disp).register(any(), processResp.capture());
+
+        TypedMessageListener<PdpStatus> func = processResp.getValue();
+        func.onTopicEvent(CommInfrastructure.NOOP, PapConstants.TOPIC_POLICY_PDP_PAP, response);
+
+        return func;
+    }
+
+    /**
+     * Gets the timeout handler that was registered with the timer manager and invokes it.
+     *
+     * @param timers the timer manager whose handler is to be invoked
+     * @param ntimes number of times to invoke the timeout handler
+     * @return the timeout handler
+     */
+    @SuppressWarnings("unchecked")
+    private void invokeTimeoutHandler(TimerManager timers, int ntimes) {
+        @SuppressWarnings("rawtypes")
+        ArgumentCaptor<Consumer> timeoutHdlr = ArgumentCaptor.forClass(Consumer.class);
+
+        for (int count = 1; count <= ntimes; ++count) {
+            // indicate that is has been published
+            queue.remove().replaceItem(null);
+
+            verify(timers, times(count)).register(any(), timeoutHdlr.capture());
+
+            @SuppressWarnings("rawtypes")
+            List<Consumer> lst = timeoutHdlr.getAllValues();
+
+            Consumer<String> hdlr = lst.get(lst.size() - 1);
+            hdlr.accept(PDP1);
+        }
+    }
+
+    /**
+     * Gets the request data from the map.
+     *
+     * @param pdpName name of the PDP whose data is desired
+     * @return the request data, or {@code null} if the PDP is not in the map
+     */
+    private ModifyReqData getReqData(String pdpName) {
+        Map<String, ModifyReqData> name2data = Whitebox.getInternalState(map, "name2data");
+        return name2data.get(pdpName);
+    }
+
+    /**
+     * Makes an update message.
+     *
+     * @return a new update message
+     */
+    private PdpUpdate makeUpdate() {
+        PdpUpdate upd = new PdpUpdate();
+
+        upd.setDescription("update-description");
+        upd.setName(PDP1);
+        upd.setPdpGroup("group1-a");
+        upd.setPdpSubgroup("sub1-a");
+        upd.setPdpType("drools");
+
+        ToscaPolicy policy1 = new ToscaPolicy(new PfConceptKey("policy-1-a", "1.0.0"));
+        ToscaPolicy policy2 = new ToscaPolicy(new PfConceptKey("policy-2-a", "1.1.0"));
+
+        upd.setPolicies(Arrays.asList(policy1, policy2));
+
+        return upd;
+    }
+
+    /**
+     * Makes a state-change message.
+     *
+     * @return a new state-change message
+     */
+    private PdpStateChange makeStateChange() {
+        PdpStateChange cng = new PdpStateChange();
+
+        cng.setName(PDP1);
+        cng.setState(PdpState.SAFE);
+
+        return cng;
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java
new file mode 100644 (file)
index 0000000..f15b2a0
--- /dev/null
@@ -0,0 +1,265 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.resources.ResourceUtils;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.pap.main.PapConstants;
+import org.onap.policy.pap.main.PolicyPapException;
+
+public class PublisherTest extends Threaded {
+
+    // these messages will have different request IDs
+    private static final PdpStateChange MSG1 = new PdpStateChange();
+    private static final PdpStateChange MSG2 = new PdpStateChange();
+
+    // MSG1 & MSG2, respectively, encoded as JSON
+    private static final String JSON1;
+    private static final String JSON2;
+
+    static {
+        try {
+            Coder coder = new StandardCoder();
+            JSON1 = coder.encode(MSG1);
+            JSON2 = coder.encode(MSG2);
+
+        } catch (CoderException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    /**
+     * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
+     * published.
+     */
+    private static final long MAX_WAIT_MS = 5000;
+
+    private Publisher pub;
+    private MyListener listener;
+
+    /**
+     * Configures the topic and attaches a listener.
+     *
+     * @throws Exception if an error occurs
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        Properties props = new Properties();
+        File propFile = new File(ResourceUtils.getFilePath4Resource("parameters/topic.properties"));
+        try (FileInputStream inp = new FileInputStream(propFile)) {
+            props.load(inp);
+        }
+
+        TopicEndpoint.manager.shutdown();
+
+        TopicEndpoint.manager.addTopics(props);
+        TopicEndpoint.manager.start();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        TopicEndpoint.manager.shutdown();
+    }
+
+    /**
+     * Set up.
+     *
+     * @throws Exception if an error occurs
+     */
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        pub = new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP);
+
+        listener = new MyListener();
+        TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).register(listener);
+    }
+
+    /**
+     * Tear down.
+     *
+     * @throws Exception if an error occurs
+     */
+    @After
+    public void tearDown() throws Exception {
+        TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).unregister(listener);
+
+        super.tearDown();
+    }
+
+    @Override
+    protected void stopThread() {
+        if (pub != null) {
+            pub.stop();
+        }
+    }
+
+    @Test
+    public void testPublisher_testStop() throws Exception {
+        startThread(pub);
+        pub.stop();
+
+        assertTrue(waitStop());
+
+        // ensure we can call "stop" a second time
+        pub.stop();
+    }
+
+    @Test
+    public void testPublisher_Ex() throws Exception {
+        assertThatThrownBy(() -> new Publisher("unknwon-topic")).isInstanceOf(PolicyPapException.class);
+    }
+
+    @Test
+    public void testEnqueue() throws Exception {
+        // enqueue before running
+        pub.enqueue(new QueueToken<>(MSG1));
+
+        // enqueue another after running
+        startThread(pub);
+        pub.enqueue(new QueueToken<>(MSG2));
+
+        String json = listener.await(MAX_WAIT_MS);
+        assertEquals(JSON1, json);
+
+        json = listener.await(MAX_WAIT_MS);
+        assertEquals(JSON2, json);
+    }
+
+    @Test
+    public void testRun_StopBeforeProcess() throws Exception {
+        // enqueue before running
+        QueueToken<PdpMessage> token = new QueueToken<>(MSG1);
+        pub.enqueue(token);
+
+        // stop before running
+        pub.stop();
+
+        // start the thread and then wait for it to stop
+        startThread(pub);
+        assertTrue(waitStop());
+
+        // message should not have been processed
+        assertTrue(listener.isEmpty());
+        assertNotNull(token.get());
+    }
+
+    @Test
+    public void testRun() throws Exception {
+        startThread(pub);
+
+        // should skip token with null message
+        QueueToken<PdpMessage> token1 = new QueueToken<>(null);
+        pub.enqueue(token1);
+
+        QueueToken<PdpMessage> token2 = new QueueToken<>(MSG2);
+        pub.enqueue(token2);
+
+        // only the second message should have been processed
+        String json = listener.await(MAX_WAIT_MS);
+        assertEquals(JSON2, json);
+        assertNull(token2.get());
+
+        pub.stop();
+        assertTrue(waitStop());
+
+        // no more messages
+        assertTrue(listener.isEmpty());
+    }
+
+    @Test
+    public void testGetNext() throws Exception {
+        startThread(pub);
+
+        // wait for a message to be processed
+        pub.enqueue(new QueueToken<>(MSG1));
+        assertNotNull(listener.await(MAX_WAIT_MS));
+
+        // now interrupt
+        interruptThread();
+
+        assertTrue(waitStop());
+    }
+
+    /**
+     * Listener for messages published to the topic.
+     */
+    private static class MyListener implements TopicListener {
+
+        /**
+         * Released every time a message is added to the queue.
+         */
+        private final Semaphore sem = new Semaphore(0);
+
+        private final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<>();
+
+        public boolean isEmpty() {
+            return messages.isEmpty();
+        }
+
+        /**
+         * Waits for a message to be published to the topic.
+         *
+         * @param waitMs time to wait, in milli-seconds
+         * @return the next message in the queue, or {@code null} if there are no messages
+         *         or if the timeout was reached
+         * @throws InterruptedException if this thread was interrupted while waiting
+         */
+        public String await(long waitMs) throws InterruptedException {
+            if (sem.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) {
+                return messages.poll();
+            }
+
+            return null;
+        }
+
+        @Override
+        public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
+            messages.add(event);
+            sem.release();
+        }
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java
new file mode 100644 (file)
index 0000000..3ff91ed
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.junit.Test;
+
+public class QueueTokenTest {
+    private static final String STRING1 = "a string";
+    private static final String STRING2 = "another string";
+
+    private QueueToken<String> token;
+
+    @Test
+    public void test() throws Exception {
+        token = new QueueToken<>(STRING1);
+        assertEquals(STRING1, token.get());
+
+        assertEquals(STRING1, token.replaceItem(STRING2));
+        assertEquals(STRING2, token.get());
+
+        assertEquals(STRING2, token.replaceItem(null));
+        assertEquals(null, token.get());
+
+        assertEquals(null, token.replaceItem(null));
+        assertEquals(null, token.get());
+
+        assertEquals(null, token.replaceItem(STRING1));
+        assertEquals(null, token.get());
+
+        /*
+         * Now do some mult-threaded tests, hopefully causing some contention.
+         */
+
+        token = new QueueToken<>("");
+
+        Set<String> values = ConcurrentHashMap.newKeySet();
+
+        // create and configure the threads
+        Thread[] threads = new Thread[100];
+        for (int x = 0; x < threads.length; ++x) {
+            final int xfinal = x;
+            threads[x] = new Thread(() -> values.add(token.replaceItem("me-" + xfinal)));
+            threads[x].setDaemon(true);
+        }
+
+        // start the threads all at once
+        for (Thread thread : threads) {
+            thread.start();
+        }
+
+        // wait for the threads to stop
+        for (Thread thread : threads) {
+            thread.join(5000);
+        }
+
+        values.add(token.replaceItem(null));
+
+        for (int x = 0; x < threads.length; ++x) {
+            String msg = "me-" + x;
+            assertTrue(msg, values.contains(msg));
+        }
+    }
+
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java
new file mode 100644 (file)
index 0000000..28e5cf9
--- /dev/null
@@ -0,0 +1,476 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
+import org.onap.policy.common.utils.services.ServiceManager;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.pap.main.PapConstants;
+import org.onap.policy.pap.main.comm.msgdata.MessageData;
+import org.onap.policy.pap.main.parameters.RequestDataParams;
+import org.powermock.reflect.Whitebox;
+
+public class RequestDataTest {
+    private static final String PDP1 = "pdp_1";
+    private static final String MY_MSG_TYPE = "my-type";
+
+    private MyRequestData reqdata;
+    private Publisher pub;
+    private RequestIdDispatcher<PdpStatus> disp;
+    private Object lock;
+    private TimerManager timers;
+    private TimerManager.Timer timer;
+    private MyMessageData msgdata;
+    private Queue<QueueToken<PdpMessage>> queue;
+    private PdpStatus response;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setUp() {
+        pub = mock(Publisher.class);
+        disp = mock(RequestIdDispatcher.class);
+        lock = new Object();
+        timers = mock(TimerManager.class);
+        timer = mock(TimerManager.Timer.class);
+        msgdata = new MyMessageData(PDP1);
+        queue = new LinkedList<>();
+        response = new PdpStatus();
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                queue.add(invocation.getArgumentAt(0, QueueToken.class));
+                return null;
+            }
+        }).when(pub).enqueue(any());
+
+        when(timers.register(any(), any())).thenReturn(timer);
+
+        reqdata = new MyRequestData(
+                        new RequestDataParams().setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp));
+
+        reqdata.setName(PDP1);
+
+        msgdata = spy(msgdata);
+        reqdata = spy(reqdata);
+    }
+
+    @Test
+    public void testRequestData_Invalid() {
+        // null params
+        assertThatThrownBy(() -> new MyRequestData(null)).isInstanceOf(NullPointerException.class);
+
+        // invalid params
+        assertThatIllegalArgumentException().isThrownBy(() -> new MyRequestData(new RequestDataParams()));
+    }
+
+    @Test
+    public void testStartPublishing() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        verify(disp).register(eq(msgdata.getMessage().getRequestId()), any());
+        verify(timers).register(eq(PDP1), any());
+        verify(pub).enqueue(any());
+
+        QueueToken<PdpMessage> token = queue.poll();
+        assertNotNull(token);
+        assertSame(msgdata.getMessage(), token.get());
+
+
+        // invoking start() again has no effect - invocation counts remain the same
+        reqdata.startPublishing();
+        verify(disp, times(1)).register(eq(msgdata.getMessage().getRequestId()), any());
+        verify(timers, times(1)).register(eq(PDP1), any());
+        verify(pub, times(1)).enqueue(any());
+    }
+
+    @Test
+    public void testStopPublishing() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+        reqdata.stopPublishing();
+
+        verify(disp).unregister(msgdata.getMessage().getRequestId());
+        verify(timer).cancel();
+
+
+        // invoking stop() again has no effect - invocation counts remain the same
+        reqdata.stopPublishing();
+
+        verify(disp, times(1)).unregister(msgdata.getMessage().getRequestId());
+        verify(timer, times(1)).cancel();
+    }
+
+    @Test
+    public void testConfigure() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        verify(disp).register(eq(msgdata.getMessage().getRequestId()), any());
+        verify(timers).register(eq(PDP1), any());
+        verify(pub).enqueue(any());
+
+        ServiceManager svcmgr = Whitebox.getInternalState(reqdata, "svcmgr");
+        assertEquals(PDP1 + " " + MY_MSG_TYPE, svcmgr.getName());
+
+
+        // bump this so we can verify that it is reset by configure()
+        reqdata.bumpRetryCount();
+
+        reqdata.configure(msgdata);
+        assertEquals(0, getRetryCount());
+    }
+
+    @Test
+    public void testEnqueue() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        // replace the message with a new message
+        reqdata.stopPublishing();
+        MyMessageData msgdata2 = new MyMessageData(PDP1);
+        reqdata.configure(msgdata2);
+        reqdata.startPublishing();
+
+        // should still only be one token in the queue
+        QueueToken<PdpMessage> token = queue.poll();
+        assertNull(queue.poll());
+        assertNotNull(token);
+        assertSame(msgdata2.getMessage(), token.get());
+
+        // null out the token
+        token.replaceItem(null);
+
+        // enqueue a new message
+        reqdata.stopPublishing();
+        MyMessageData msgdata3 = new MyMessageData(PDP1);
+        reqdata.configure(msgdata3);
+        reqdata.startPublishing();
+
+        // a new token should have been placed in the queue
+        QueueToken<PdpMessage> token2 = queue.poll();
+        assertTrue(token != token2);
+        assertNull(queue.poll());
+        assertNotNull(token2);
+        assertSame(msgdata3.getMessage(), token2.get());
+    }
+
+    @Test
+    public void testResetRetryCount_testBumpRetryCount() {
+        when(msgdata.getMaxRetryCount()).thenReturn(2);
+
+        reqdata.configure(msgdata);
+
+        assertEquals(0, getRetryCount());
+        assertTrue(reqdata.bumpRetryCount());
+        assertTrue(reqdata.bumpRetryCount());
+
+        // limit should now be reached and it should go no further
+        assertFalse(reqdata.bumpRetryCount());
+        assertFalse(reqdata.bumpRetryCount());
+
+        assertEquals(2, getRetryCount());
+
+        reqdata.resetRetryCount();
+        assertEquals(0, getRetryCount());
+    }
+
+    @Test
+    public void testRetryCountExhausted() {
+        reqdata.configure(msgdata);
+
+        reqdata.retryCountExhausted();
+
+        verify(reqdata).allCompleted();
+    }
+
+    @Test
+    public void testProcessResponse() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        invokeProcessResponse();
+
+        verify(reqdata).stopPublishing();
+        verify(msgdata).checkResponse(response);
+        verify(msgdata).completed();
+    }
+
+    @Test
+    public void testProcessResponse_NotPublishing() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        reqdata.stopPublishing();
+
+        invokeProcessResponse();
+
+        // only invocation should have been the one before calling invokeProcessResponse()
+        verify(reqdata, times(1)).stopPublishing();
+
+        verify(msgdata, never()).checkResponse(response);
+        verify(msgdata, never()).completed();
+    }
+
+    @Test
+    public void testProcessResponse_NotActive() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        when(reqdata.isActive()).thenReturn(false);
+
+        invokeProcessResponse();
+
+        // it should still stop publishing
+        verify(reqdata).stopPublishing();
+
+        verify(msgdata, never()).checkResponse(response);
+        verify(msgdata, never()).completed();
+    }
+
+    @Test
+    public void testProcessResponse_ResponseFailed() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        when(msgdata.checkResponse(response)).thenReturn("failed");
+
+        invokeProcessResponse();
+
+        verify(reqdata).stopPublishing();
+        verify(msgdata).checkResponse(response);
+
+        verify(msgdata, never()).completed();
+        verify(msgdata).mismatch("failed");
+    }
+
+    @Test
+    public void testHandleTimeout() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        // remove it from the queue
+        queue.poll().replaceItem(null);
+
+        invokeTimeoutHandler();
+
+        // count should have been bumped
+        assertEquals(1, getRetryCount());
+
+        // should have invoked startPublishing() a second time
+        verify(reqdata, times(2)).startPublishing();
+    }
+
+    @Test
+    public void testHandleTimeout_NotPublishing() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        reqdata.stopPublishing();
+
+        invokeTimeoutHandler();
+
+        // should NOT have invoked startPublishing() a second time
+        verify(reqdata, times(1)).startPublishing();
+        verify(reqdata, never()).retryCountExhausted();
+    }
+
+    @Test
+    public void testHandleTimeout_NotActive() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        when(reqdata.isActive()).thenReturn(false);
+
+        invokeTimeoutHandler();
+
+        // should NOT have invoked startPublishing() a second time
+        verify(reqdata, times(1)).startPublishing();
+        verify(reqdata, never()).retryCountExhausted();
+    }
+
+    @Test
+    public void testHandleTimeout_StillInQueue() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        reqdata.bumpRetryCount();
+
+        invokeTimeoutHandler();
+
+        // count should reset the count
+        assertEquals(0, getRetryCount());
+
+        // should have invoked startPublishing() a second time
+        verify(reqdata, times(2)).startPublishing();
+    }
+
+    @Test
+    public void testHandleTimeout_RetryExhausted() {
+        reqdata.configure(msgdata);
+        reqdata.startPublishing();
+
+        // exhaust the count
+        reqdata.bumpRetryCount();
+        reqdata.bumpRetryCount();
+        reqdata.bumpRetryCount();
+
+        // remove it from the queue
+        queue.poll().replaceItem(null);
+
+        invokeTimeoutHandler();
+
+        // should NOT have invoked startPublishing() a second time
+        verify(reqdata, times(1)).startPublishing();
+
+        verify(reqdata).retryCountExhausted();
+    }
+
+    @Test
+    public void testGetName_testSetName() {
+        reqdata.setName("abc");
+        assertEquals("abc", reqdata.getName());
+    }
+
+    @Test
+    public void testGetWrapper() {
+        reqdata.configure(msgdata);
+        assertSame(msgdata, reqdata.getWrapper());
+    }
+
+    /**
+     * Gets the retry count from the data.
+     * @return the current retry count
+     */
+    private int getRetryCount() {
+        return Whitebox.getInternalState(reqdata, "retryCount");
+    }
+
+    /**
+     * Gets the listener that was registered with the dispatcher and invokes it.
+     */
+    @SuppressWarnings("unchecked")
+    private void invokeProcessResponse() {
+        @SuppressWarnings("rawtypes")
+        ArgumentCaptor<TypedMessageListener> processResp = ArgumentCaptor.forClass(TypedMessageListener.class);
+
+        verify(disp).register(any(), processResp.capture());
+
+        processResp.getValue().onTopicEvent(CommInfrastructure.NOOP, PapConstants.TOPIC_POLICY_PDP_PAP, response);
+    }
+
+    /**
+     * Gets the timeout handler that was registered with the timer manager and invokes it.
+     */
+    @SuppressWarnings("unchecked")
+    private void invokeTimeoutHandler() {
+        @SuppressWarnings("rawtypes")
+        ArgumentCaptor<Consumer> timeoutHdlr = ArgumentCaptor.forClass(Consumer.class);
+
+        verify(timers).register(any(), timeoutHdlr.capture());
+
+        timeoutHdlr.getValue().accept(PDP1);
+    }
+
+    private class MyRequestData extends RequestData {
+
+        public MyRequestData(RequestDataParams params) {
+            super(params);
+        }
+
+        @Override
+        protected boolean isActive() {
+            return true;
+        }
+
+        @Override
+        protected void allCompleted() {
+            // do nothing
+        }
+    }
+
+    private class MyMessageData extends MessageData {
+
+        public MyMessageData(String pdpName) {
+            super(new PdpStateChange(), 1, timers);
+
+            PdpStateChange msg = (PdpStateChange) getMessage();
+            msg.setName(pdpName);
+            msg.setState(PdpState.ACTIVE);
+        }
+
+        @Override
+        public String getType() {
+            return MY_MSG_TYPE;
+        }
+
+        @Override
+        public void mismatch(String reason) {
+            // do nothing
+        }
+
+        @Override
+        public void completed() {
+            // do nothing
+        }
+
+        @Override
+        public String checkResponse(PdpStatus response) {
+            // always valid - return null
+            return null;
+        }
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java b/main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java
new file mode 100644 (file)
index 0000000..d6a0d1f
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Super class for tests that run a background thread.
+ */
+public abstract class Threaded {
+
+    /**
+     * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
+     * published.
+     */
+    public static final long MAX_WAIT_MS = 5000;
+
+    /**
+     * The current background thread.
+     */
+    private Thread thread;
+
+    /**
+     * Indicates that a test is about to begin.
+     *
+     * @throws Exception if an error occurs
+     */
+    @Before
+    public void setUp() throws Exception {
+        thread = null;
+    }
+
+    /**
+     * Invokes the "stopper" function to tell the background thread to exit and then waits
+     * for it to terminate.
+     *
+     * @throws Exception if an error occurs
+     */
+    @After
+    public void tearDown() throws Exception {
+        stopThread();
+        waitStop();
+    }
+
+    /**
+     * Signals the background thread to stop.
+     *
+     * @throws Exception if an error occurs
+     */
+    protected abstract void stopThread() throws Exception;
+
+    /**
+     * Starts a background thread.
+     *
+     * @param runner what should be executed in the background thread
+     * @throws IllegalStateException if a background thread is already running
+     */
+    protected void startThread(Runnable runner) {
+        if (thread != null) {
+            throw new IllegalStateException("a background thread is already running");
+        }
+
+        thread = new Thread(runner);
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    /**
+     * Interrupts the background thread.
+     */
+    protected void interruptThread() {
+        thread.interrupt();
+    }
+
+    /**
+     * Waits for the background thread to stop.
+     *
+     * @return {@code true} if the thread has stopped, {@code false} otherwise
+     * @throws InterruptedException if this thread is interrupted while waiting
+     */
+    protected boolean waitStop() throws InterruptedException {
+        if (thread != null) {
+            Thread thread2 = thread;
+            thread = null;
+
+            thread2.join(MAX_WAIT_MS);
+
+            return !thread2.isAlive();
+        }
+
+        return true;
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java
new file mode 100644 (file)
index 0000000..3d5da90
--- /dev/null
@@ -0,0 +1,400 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.pap.main.comm.TimerManager.Timer;
+
+public class TimerManagerTest extends Threaded {
+    private static final String EXPECTED_EXCEPTION = "expected exception";
+    private static final String MGR_NAME = "my-manager";
+    private static final String NAME1 = "timer-A";
+    private static final String NAME2 = "timer-B";
+    private static final String NAME3 = "timer-C";
+
+    private static final long MGR_TIMEOUT_MS = 10000;
+
+    private MyManager mgr;
+
+    /*
+     * This is a field to prevent checkstyle from complaining about the distance between
+     * its assignment and its use.
+     */
+    private long tcur;
+
+    /**
+     * Sets up.
+     *
+     * @throws Exception if an error occurs
+     */
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        mgr = new MyManager(MGR_NAME, MGR_TIMEOUT_MS);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Override
+    protected void stopThread() throws Exception {
+        if (mgr != null) {
+            mgr.stop();
+            mgr.stopSleep();
+        }
+    }
+
+    @Test
+    public void testTimerManager_testStop() throws Exception {
+        startThread(mgr);
+
+        mgr.stop();
+        assertTrue(waitStop());
+
+        // ensure we can call "stop" a second time
+        mgr.stop();
+    }
+
+    @Test
+    public void testRegister() throws Exception {
+        mgr.register(NAME2, mgr::addToQueue);
+        mgr.register(NAME1, mgr::addToQueue);
+
+        // goes to the end of the queue
+        mgr.register(NAME2, mgr::addToQueue);
+
+        startThread(mgr);
+
+        mgr.allowSleep(2);
+
+        assertEquals(NAME1, mgr.awaitTimer());
+        assertEquals(NAME2, mgr.awaitTimer());
+    }
+
+    @Test
+    public void testRun_Ex() throws Exception {
+        startThread(mgr);
+        mgr.register(NAME1, mgr::addToQueue);
+
+        mgr.awaitSleep();
+
+        // background thread is "sleeping" - now we can interrupt it
+        interruptThread();
+
+        assertTrue(waitStop());
+    }
+
+    @Test
+    public void testProcessTimers() throws Exception {
+        startThread(mgr);
+        mgr.register(NAME1, mgr::addToQueue);
+        mgr.awaitSleep();
+        mgr.allowSleep(1);
+
+        mgr.register(NAME2, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        // tell it to stop before returning from "sleep"
+        mgr.stop();
+        mgr.allowSleep(1);
+
+        assertTrue(waitStop());
+
+        assertEquals(NAME1, mgr.pollResult());
+        assertNull(mgr.pollResult());
+    }
+
+    @Test
+    public void testGetNextTimer() throws Exception {
+        startThread(mgr);
+        mgr.register(NAME1, mgr::addToQueue);
+        mgr.awaitSleep();
+        mgr.allowSleep(1);
+
+        mgr.register(NAME2, mgr::addToQueue);
+
+        mgr.awaitSleep();
+    }
+
+    @Test
+    public void testProcessTimer_StopWhileWaiting() throws Exception {
+        startThread(mgr);
+        mgr.register(NAME1, mgr::addToQueue);
+        mgr.awaitSleep();
+        mgr.allowSleep(1);
+
+        mgr.register(NAME2, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        mgr.stop();
+        mgr.allowSleep(1);
+
+        assertTrue(waitStop());
+
+        // should have stopped after processing the first timer
+        assertEquals(NAME1, mgr.pollResult());
+        assertNull(mgr.pollResult());
+    }
+
+    @Test
+    public void testProcessTimer_CancelWhileWaiting() throws Exception {
+        startThread(mgr);
+        Timer timer = mgr.register(NAME1, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        timer.cancel();
+        mgr.allowSleep(1);
+
+        mgr.register(NAME2, mgr::addToQueue);
+        mgr.awaitSleep();
+        mgr.allowSleep(1);
+
+        mgr.register(NAME1, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        // should have fired timer 2, but not timer 1
+        assertEquals(NAME2, mgr.pollResult());
+        assertNull(mgr.pollResult());
+    }
+
+    @Test
+    public void testProcessTimer_TimerEx() throws Exception {
+        startThread(mgr);
+        mgr.register(NAME1, name -> {
+            throw new RuntimeException(EXPECTED_EXCEPTION);
+        });
+        mgr.register(NAME2, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        mgr.allowSleep(2);
+
+        mgr.register(NAME3, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        // timer 1 fired but threw an exception, so only timer 2 should be in the queue
+        assertEquals(NAME2, mgr.pollResult());
+    }
+
+    @Test
+    public void testTimerAwait() throws Exception {
+        startThread(mgr);
+
+        // same times - only need one sleep
+        mgr.register(NAME1, mgr::addToQueue);
+        mgr.register(NAME2, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        tcur = mgr.currentTimeMillis();
+
+        mgr.allowSleep(1);
+
+        // next one will have a new timeout, so expect to sleep
+        mgr.register(NAME3, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        long tcur2 = mgr.currentTimeMillis();
+        assertTrue(tcur2 >= tcur + MGR_TIMEOUT_MS);
+
+        assertEquals(NAME1, mgr.pollResult());
+        assertEquals(NAME2, mgr.pollResult());
+        assertNull(mgr.pollResult());
+    }
+
+    @Test
+    public void testTimerCancel_WhileWaiting() throws Exception {
+        startThread(mgr);
+
+        Timer timer = mgr.register(NAME1, mgr::addToQueue);
+        mgr.awaitSleep();
+
+        // cancel while sleeping
+        timer.cancel();
+
+        mgr.register(NAME2, mgr::addToQueue);
+
+        // allow it to sleep through both timers
+        mgr.allowSleep(2);
+
+        // only timer 2 should have fired
+        assertEquals(NAME2, mgr.timedPollResult());
+    }
+
+    @Test
+    public void testTimerCancel_ViaReplace() throws Exception {
+        startThread(mgr);
+
+        mgr.register(NAME1, name -> mgr.addToQueue("hello"));
+        mgr.awaitSleep();
+
+        // replace the timer while the background thread is sleeping
+        mgr.register(NAME1, name -> mgr.addToQueue("world"));
+
+        // allow it to sleep through both timers
+        mgr.allowSleep(2);
+
+        // only timer 2 should have fired
+        assertEquals("world", mgr.timedPollResult());
+    }
+
+    @Test
+    public void testTimerToString() {
+        Timer timer = mgr.register(NAME1, mgr::addToQueue);
+        assertNotNull(timer.toString());
+    }
+
+    @Test
+    public void testCurrentTimeMillis() {
+        long tbeg = System.currentTimeMillis();
+        long tcur = new TimerManager(MGR_NAME, MGR_TIMEOUT_MS).currentTimeMillis();
+        long tend = System.currentTimeMillis();
+
+        assertTrue(tcur >= tbeg);
+        assertTrue(tend >= tcur);
+    }
+
+    @Test
+    public void testSleep() throws Exception {
+        long tbeg = System.currentTimeMillis();
+        new TimerManager(MGR_NAME, MGR_TIMEOUT_MS).sleep(10);
+        long tend = System.currentTimeMillis();
+
+        assertTrue(tend >= tbeg + 10);
+    }
+
+    private static class MyManager extends TimerManager {
+        private AtomicLong curTime = new AtomicLong(1000);
+        private LinkedBlockingQueue<Boolean> sleepEntered = new LinkedBlockingQueue<>();
+        private LinkedBlockingQueue<Boolean> shouldStop = new LinkedBlockingQueue<>();
+        private LinkedBlockingQueue<String> results = new LinkedBlockingQueue<>();
+
+        public MyManager(String name, long waitTimeMs) {
+            super(name, waitTimeMs);
+        }
+
+        /**
+         * Registers a timer. Also increments {@link #curTime} so that every time has a
+         * different expiration time, which prevents some issue with the junit tests.
+         */
+        @Override
+        public Timer register(String timerName, Consumer<String> action) {
+            curTime.addAndGet(1);
+            return super.register(timerName, action);
+        }
+
+        /**
+         * Stops the "sleep".
+         */
+        public void stopSleep() {
+            shouldStop.add(true);
+        }
+
+        /**
+         * Allows the manager to "sleep" several times.
+         *
+         * @param ntimes the number of times the manager should sleep
+         */
+        public void allowSleep(int ntimes) {
+            for (int x = 0; x < ntimes; ++x) {
+                shouldStop.add(false);
+            }
+        }
+
+        /**
+         * Waits for the manager to "sleep".
+         *
+         * @throws InterruptedException if the thread is interrupted while waiting for the
+         *         background thread to sleep
+         */
+        public void awaitSleep() throws InterruptedException {
+            if (sleepEntered.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS) == null) {
+                fail("background thread failed to sleep");
+            }
+        }
+
+        @Override
+        protected long currentTimeMillis() {
+            return curTime.get();
+        }
+
+        @Override
+        protected void sleep(long timeMs) throws InterruptedException {
+            sleepEntered.offer(true);
+
+            if (!shouldStop.take()) {
+                // test thread did not request that we stop
+                curTime.addAndGet(timeMs);
+            }
+        }
+
+        /**
+         * Waits for a timer to fire.
+         *
+         * @return the message the timer added to {@link #results}
+         * @throws InterruptedException if this thread is interrupted while waiting
+         */
+        private String awaitTimer() throws InterruptedException {
+            return results.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+        }
+
+        /**
+         * Adds a name to the queue.
+         *
+         * @param name the name to add
+         */
+        private void addToQueue(String name) {
+            results.add(name);
+        }
+
+        /**
+         * Polls for a result.
+         *
+         * @return the next result, or {@code null}
+         */
+        private String pollResult() {
+            return results.poll();
+        }
+
+        /**
+         * Polls for a result, waiting a limited amount of time.
+         *
+         * @return the next result, or {@code null}
+         * @throws InterruptedException if the thread is interrupted while waiting
+         */
+        private String timedPollResult() throws InterruptedException {
+            return results.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+        }
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java
new file mode 100644 (file)
index 0000000..68b0263
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.TimerManager;
+
+public class MessageDataTest {
+    private static final int RETRIES = 1;
+
+    private MyData data;
+    private TimerManager timers;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        timers = mock(TimerManager.class);
+
+        data = new MyData();
+    }
+
+    @Test
+    public void testGetMessage() {
+        assertNotNull(data.getMessage());
+    }
+
+    @Test
+    public void testGetType() {
+        assertEquals(PdpStateChange.class.getSimpleName(), data.getType());
+    }
+
+    @Test
+    public void testGetMaxRetryCount() {
+        assertEquals(RETRIES, data.getMaxRetryCount());
+    }
+
+    @Test
+    public void testGetTimers() {
+        assertSame(timers, data.getTimers());
+    }
+
+    private class MyData extends MessageData {
+
+        public MyData() {
+            super(new PdpStateChange(), RETRIES, timers);
+        }
+
+        @Override
+        public void mismatch(String reason) {
+            // do nothing
+        }
+
+        @Override
+        public void completed() {
+            // do nothing
+        }
+
+        @Override
+        public String checkResponse(PdpStatus response) {
+            // always succeed
+            return null;
+        }
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java
new file mode 100644 (file)
index 0000000..029775f
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.PdpParameters;
+import org.onap.policy.pap.main.parameters.PdpStateChangeParameters;
+
+public class StateChangeDataTest {
+    private static final String MY_NAME = "my-name";
+    private static final String DIFFERENT = "different";
+    private static final PdpState MY_STATE = PdpState.SAFE;
+    private static final PdpState DIFF_STATE = PdpState.TERMINATED;
+    private static final int RETRIES = 1;
+
+    private MyData data;
+    private PdpModifyRequestMapParams params;
+    private TimerManager timers;
+    private PdpStatus response;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        timers = mock(TimerManager.class);
+        response = new PdpStatus();
+        PdpParameters pdpParams = mock(PdpParameters.class);
+        PdpStateChangeParameters stateParams = mock(PdpStateChangeParameters.class);
+
+        when(stateParams.getMaxRetryCount()).thenReturn(RETRIES);
+        when(pdpParams.getStateChangeParameters()).thenReturn(stateParams);
+
+        params = new PdpModifyRequestMapParams().setParams(pdpParams).setStateChangeTimers(timers);
+
+        response.setName(MY_NAME);
+        response.setState(MY_STATE);
+
+        data = new MyData();
+    }
+
+    @Test
+    public void testGetMaxRetryCount() {
+        assertEquals(RETRIES, data.getMaxRetryCount());
+    }
+
+    @Test
+    public void testGetTimers() {
+        assertSame(timers, data.getTimers());
+    }
+
+    @Test
+    public void testStateChangeCheckResponse() {
+        assertNull(data.checkResponse(response));
+    }
+
+    @Test
+    public void testStateChangeCheckResponse_MismatchedName() {
+        response.setName(DIFFERENT);
+
+        assertEquals("name does not match", data.checkResponse(response));
+    }
+
+    @Test
+    public void testStateChangeCheckResponse_MismatchedState() {
+        response.setState(DIFF_STATE);
+
+        assertEquals("state is TERMINATED, but expected SAFE", data.checkResponse(response));
+    }
+
+    private class MyData extends StateChangeData {
+
+        public MyData() {
+            super(new PdpStateChange(), params);
+
+            PdpStateChange msg = (PdpStateChange) getMessage();
+
+            msg.setName(MY_NAME);
+            msg.setState(MY_STATE);
+        }
+
+        @Override
+        public void mismatch(String reason) {
+            // do nothing
+        }
+
+        @Override
+        public void completed() {
+            // do nothing
+        }
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java
new file mode 100644 (file)
index 0000000..8676c95
--- /dev/null
@@ -0,0 +1,169 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.base.PfConceptKey;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy;
+import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.PdpParameters;
+import org.onap.policy.pap.main.parameters.PdpUpdateParameters;
+
+public class UpdateDataTest {
+    private static final String MY_NAME = "my-name";
+    private static final String DIFFERENT = "different";
+    private static final int RETRIES = 1;
+
+    private MyData data;
+    private PdpModifyRequestMapParams params;
+    private TimerManager timers;
+    private PdpUpdate update;
+    private PdpStatus response;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        timers = mock(TimerManager.class);
+        response = new PdpStatus();
+        PdpParameters pdpParams = mock(PdpParameters.class);
+        PdpUpdateParameters stateParams = mock(PdpUpdateParameters.class);
+
+        when(stateParams.getMaxRetryCount()).thenReturn(RETRIES);
+        when(pdpParams.getUpdateParameters()).thenReturn(stateParams);
+
+        params = new PdpModifyRequestMapParams().setParams(pdpParams).setUpdateTimers(timers);
+
+        update = makeUpdate();
+
+        response.setName(MY_NAME);
+        response.setPdpGroup(update.getPdpGroup());
+        response.setPdpSubgroup(update.getPdpSubgroup());
+        response.setPolicies(update.getPolicies());
+
+        data = new MyData(update);
+    }
+
+    @Test
+    public void testGetMaxRetryCount() {
+        assertEquals(RETRIES, data.getMaxRetryCount());
+    }
+
+    @Test
+    public void testGetTimers() {
+        assertSame(timers, data.getTimers());
+    }
+
+    @Test
+    public void testUpdateCheckResponse() {
+        assertNull(data.checkResponse(response));
+    }
+
+    @Test
+    public void testUpdateDataCheckResponse_MismatchedName() {
+        response.setName(DIFFERENT);
+
+        assertEquals("name does not match", data.checkResponse(response));
+    }
+
+    @Test
+    public void testUpdateDataCheckResponse_MismatchedGroup() {
+        response.setPdpGroup(DIFFERENT);
+
+        assertEquals("group does not match", data.checkResponse(response));
+    }
+
+    @Test
+    public void testUpdateDataCheckResponse_MismatchedSubGroup() {
+        response.setPdpSubgroup(DIFFERENT);
+
+        assertEquals("subgroup does not match", data.checkResponse(response));
+    }
+
+    @Test
+    public void testUpdateDataCheckResponse_MismatchedPoliciesLength() {
+        response.setPolicies(Arrays.asList(update.getPolicies().get(0)));
+
+        assertEquals("policies do not match", data.checkResponse(response));
+    }
+
+    @Test
+    public void testUpdateDataCheckResponse_MismatchedPolicies() {
+        ArrayList<ToscaPolicy> policies = new ArrayList<>(update.getPolicies());
+        policies.set(0, new ToscaPolicy(new PfConceptKey(DIFFERENT, "10.0.0")));
+
+        response.setPolicies(policies);
+
+        assertEquals("policies do not match", data.checkResponse(response));
+    }
+
+    /**
+     * Makes an update message.
+     *
+     * @return a new update message
+     */
+    private PdpUpdate makeUpdate() {
+        PdpUpdate upd = new PdpUpdate();
+
+        upd.setDescription("update-description");
+        upd.setName(MY_NAME);
+        upd.setPdpGroup("group1-a");
+        upd.setPdpSubgroup("sub1-a");
+        upd.setPdpType("drools");
+
+        ToscaPolicy policy1 = new ToscaPolicy(new PfConceptKey("policy-1-a", "1.0.0"));
+        ToscaPolicy policy2 = new ToscaPolicy(new PfConceptKey("policy-2-a", "1.1.0"));
+
+        upd.setPolicies(Arrays.asList(policy1, policy2));
+
+        return upd;
+    }
+
+    private class MyData extends UpdateData {
+
+        public MyData(PdpUpdate message) {
+            super(message, params);
+        }
+
+        @Override
+        public void mismatch(String reason) {
+            // do nothing
+        }
+
+        @Override
+        public void completed() {
+            // do nothing
+        }
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java b/main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java
new file mode 100644 (file)
index 0000000..3e69189
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.parameters;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.TimerManager;
+
+public class TestPdpModifyRequestMapParams {
+    private PdpModifyRequestMapParams params;
+    private Publisher pub;
+    private RequestIdDispatcher<PdpStatus> disp;
+    private Object lock;
+    private PdpParameters pdpParams;
+    private TimerManager updTimers;
+    private TimerManager stateTimers;
+
+    /**
+     * Sets up the objects and creates an empty {@link #params}.
+     */
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setUp() {
+        pub = mock(Publisher.class);
+        disp = mock(RequestIdDispatcher.class);
+        lock = new Object();
+        pdpParams = mock(PdpParameters.class);
+        updTimers = mock(TimerManager.class);
+        stateTimers = mock(TimerManager.class);
+
+        params = new PdpModifyRequestMapParams().setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp);
+    }
+
+    @Test
+    public void testGettersSetters() {
+        assertSame(params, params.setParams(pdpParams).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers));
+
+        assertSame(pdpParams, params.getParams());
+        assertSame(updTimers, params.getUpdateTimers());
+        assertSame(stateTimers, params.getStateChangeTimers());
+
+        // super class data should also be available
+        assertSame(pub, params.getPublisher());
+        assertSame(disp, params.getResponseDispatcher());
+        assertSame(lock, params.getModifyLock());
+    }
+
+    @Test
+    public void testValidate() {
+        // no exception
+        params.setParams(pdpParams).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers).validate();
+    }
+
+    @Test
+    public void testValidate_MissingPdpParams() {
+        assertThatIllegalArgumentException().isThrownBy(
+            () -> params.setStateChangeTimers(stateTimers).setUpdateTimers(updTimers).validate())
+                        .withMessageContaining("PDP param");
+    }
+
+    @Test
+    public void testValidate_MissingStateChangeTimers() {
+        assertThatIllegalArgumentException().isThrownBy(
+            () -> params.setParams(pdpParams).setUpdateTimers(updTimers).validate())
+                        .withMessageContaining("state");
+    }
+
+    @Test
+    public void testValidate_MissingUpdateTimers() {
+        assertThatIllegalArgumentException().isThrownBy(
+            () -> params.setParams(pdpParams).setStateChangeTimers(stateTimers).validate())
+                        .withMessageContaining("update");
+    }
+
+    @Test
+    public void testValidate_MissingSuperclassData() {
+        // leave out one of the superclass fields
+        assertThatIllegalArgumentException().isThrownBy(
+            () -> new PdpModifyRequestMapParams()
+                        .setPublisher(pub)
+                        .setResponseDispatcher(disp).setParams(pdpParams).setStateChangeTimers(stateTimers)
+                        .setUpdateTimers(updTimers).validate()).withMessageContaining("Lock");
+
+    }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java b/main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java
new file mode 100644 (file)
index 0000000..16d247f
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.parameters;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.Publisher;
+
+public class TestRequestDataParams {
+    private RequestDataParams params;
+    private Publisher pub;
+    private RequestIdDispatcher<PdpStatus> disp;
+    private Object lock;
+
+    /**
+     * Sets up the objects and creates an empty {@link #params}.
+     */
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setUp() {
+        pub = mock(Publisher.class);
+        disp = mock(RequestIdDispatcher.class);
+        lock = new Object();
+
+        params = new RequestDataParams();
+    }
+
+    @Test
+    public void testGettersSetters() {
+        assertSame(params, params.setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp));
+
+        assertSame(pub, params.getPublisher());
+        assertSame(disp, params.getResponseDispatcher());
+        assertSame(lock, params.getModifyLock());
+    }
+
+    @Test
+    public void testValidate() {
+        // no exception
+        params.setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp).validate();
+    }
+
+    @Test
+    public void testValidate_MissingLock() {
+        assertThatIllegalArgumentException().isThrownBy(
+            () -> params.setPublisher(pub).setResponseDispatcher(disp).validate())
+            .withMessageContaining("Lock");
+    }
+
+    @Test
+    public void testValidate_MissingDispatcher() {
+        assertThatIllegalArgumentException().isThrownBy(
+            () -> params.setModifyLock(lock).setPublisher(pub).validate())
+            .withMessageContaining("Dispatcher");
+    }
+
+    @Test
+    public void testValidate_MissingPublisher() {
+        assertThatIllegalArgumentException().isThrownBy(
+            () -> params.setModifyLock(lock).setResponseDispatcher(disp).validate())
+            .withMessageContaining("publisher");
+    }
+}
index cfa2ae9..6c9e092 100644 (file)
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.FileInputStream;
@@ -35,6 +36,7 @@ import org.junit.Test;
 import org.onap.policy.common.utils.services.Registry;
 import org.onap.policy.pap.main.PapConstants;
 import org.onap.policy.pap.main.PolicyPapException;
+import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
 import org.onap.policy.pap.main.parameters.CommonTestData;
 import org.onap.policy.pap.main.parameters.PapParameterGroup;
 import org.onap.policy.pap.main.parameters.PapParameterHandler;
@@ -75,6 +77,7 @@ public class TestPapActivator {
 
     /**
      * Method for cleanup after each test.
+     *
      * @throws Exception if an error occurs
      */
     @After
@@ -95,6 +98,7 @@ public class TestPapActivator {
         // ensure items were added to the registry
         assertNotNull(Registry.get(PapConstants.REG_PDP_MODIFY_LOCK, Object.class));
         assertNotNull(Registry.get(PapConstants.REG_STATISTICS_MANAGER, PapStatisticsManager.class));
+        assertNotNull(Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class));
 
         // repeat - should throw an exception
         assertThatIllegalStateException().isThrownBy(() -> activator.start());
@@ -108,6 +112,11 @@ public class TestPapActivator {
         activator.stop();
         assertFalse(activator.isAlive());
 
+        // ensure items have been removed from the registry
+        assertNull(Registry.getOrDefault(PapConstants.REG_PDP_MODIFY_LOCK, Object.class, null));
+        assertNull(Registry.getOrDefault(PapConstants.REG_STATISTICS_MANAGER, PapStatisticsManager.class, null));
+        assertNull(Registry.getOrDefault(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class, null));
+
         // repeat - should throw an exception
         assertThatIllegalStateException().isThrownBy(() -> activator.stop());
         assertFalse(activator.isAlive());