Receive PDP status messages 60/79660/3
authorJim Hahn <jrh3@att.com>
Wed, 27 Feb 2019 20:35:22 +0000 (15:35 -0500)
committerJim Hahn <jrh3@att.com>
Mon, 4 Mar 2019 23:55:21 +0000 (18:55 -0500)
Added infrastructure code to facilitate receipt and tracking of
PDP status messages.

Removed temporary PdpStatus class.

Use variable for pdp-common version.

Change-Id: Id76c45f26b6d9f5eb9395332b637a0b1e90bc496
Issue-ID: POLICY-1444
Signed-off-by: Jim Hahn <jrh3@att.com>
main/pom.xml
main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java [new file with mode: 0644]
main/src/main/java/org/onap/policy/pap/main/parameters/PapParameterHandler.java
main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java [new file with mode: 0644]
main/src/test/java/org/onap/policy/pap/main/comm/PdpClientExceptionTest.java

index 63eb0ec..6cf42d0 100644 (file)
             <artifactId>common-parameters</artifactId>
             <version>${policy.common.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.onap.policy.common</groupId>
+            <artifactId>pdp-common</artifactId>
+            <version>${policy.common.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.onap.policy.common</groupId>
             <artifactId>utils</artifactId>
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java
new file mode 100644 (file)
index 0000000..38588d8
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
+import org.onap.policy.pdp.common.models.PdpStatus;
+
+/**
+ * Listener for PDP Status messages expected to be received from multiple PDPs. The
+ * listener "completes" once a message has been seen from all of the PDPs.
+ */
+public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> {
+
+    /**
+     * This is decremented once a message has been received from every PDP.
+     */
+    private final CountDownLatch allSeen = new CountDownLatch(1);
+
+    /**
+     * PDPs from which no message has been received yet.
+     */
+    private final Set<String> unseenPdpNames = ConcurrentHashMap.newKeySet();
+
+    /**
+     * Constructs the object.
+     *
+     * @param pdpName name of the PDP for which to wait
+     */
+    public MultiPdpStatusListener(String pdpName) {
+        unseenPdpNames.add(pdpName);
+    }
+
+    /**
+     * Constructs the object.
+     *
+     * @param pdpNames names of the PDP for which to wait
+     */
+    public MultiPdpStatusListener(Collection<String> pdpNames) {
+        if (pdpNames.isEmpty()) {
+            allSeen.countDown();
+
+        } else {
+            unseenPdpNames.addAll(pdpNames);
+        }
+    }
+
+    /**
+     * Gets the set of names for which messages have not yet been received.
+     *
+     * @return the names of the PDPs that have not been seen yet
+     */
+    public SortedSet<String> getUnseenPdpNames() {
+        return new TreeSet<>(unseenPdpNames);
+    }
+
+    /**
+     * Waits for messages to be received for all PDPs, or until a timeout is reached.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return {@code true} if messages were received for all PDPs, {@code false} if the
+     *         timeout was reached first
+     * @throws InterruptedException if the current thread is interrupted while waiting
+     */
+    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+        return allSeen.await(timeout, unit);
+    }
+
+    /**
+     * Indicates that a message was received for a PDP. Triggers completion of
+     * {@link #await(long, TimeUnit)} if all PDPs have received a message. Threads may
+     * override this method to process a message. However, they should still invoke this
+     * method so that PDPs can be properly tracked.
+     */
+    @Override
+    public void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) {
+        unseenPdpNames.remove(message.getName());
+
+        if (unseenPdpNames.isEmpty()) {
+            allSeen.countDown();
+        }
+    }
+}
index 489eb52..0bcbb5c 100644 (file)
@@ -21,7 +21,6 @@
 
 package org.onap.policy.pap.main.parameters;
 
-
 import java.io.File;
 import org.onap.policy.common.parameters.GroupValidationResult;
 import org.onap.policy.common.utils.coder.Coder;
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java
new file mode 100644 (file)
index 0000000..2dc1aa2
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.pap.main.comm.MultiPdpStatusListener;
+import org.onap.policy.pdp.common.models.PdpStatus;
+
+public class MultiPdpStatusListenerTest {
+    private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
+    private static final String TOPIC = "my-topic";
+    private static final String NAME1 = "pdp_1";
+    private static final String NAME2 = "pdp_2";
+    private static final List<String> NAME_LIST = Arrays.asList(NAME1, NAME2);
+
+    private MultiPdpStatusListener listener;
+    private PdpStatus status;
+
+    @Test
+    public void testMultiPdpStatusListenerString() throws Exception {
+        listener = new MultiPdpStatusListener(NAME1);
+        assertEquals(Arrays.asList(NAME1).toString(), listener.getUnseenPdpNames().toString());
+
+        // a name is in the queue - not done yet
+        assertFalse(doWait(0));
+    }
+
+    @Test
+    public void testMultiPdpStatusListenerCollectionOfString() throws Exception {
+        List<String> lst = NAME_LIST;
+
+        listener = new MultiPdpStatusListener(lst);
+        assertEquals(lst.toString(), listener.getUnseenPdpNames().toString());
+
+        // a name is in the queue - not done yet
+        assertFalse(doWait(0));
+
+        /*
+         * Try with an empty list - should already be complete.
+         */
+        listener = new MultiPdpStatusListener(new LinkedList<>());
+        assertTrue(listener.getUnseenPdpNames().isEmpty());
+        assertTrue(doWait(0));
+    }
+
+    @Test
+    public void testGetUnseenPdpNames() {
+        List<String> lst = NAME_LIST;
+
+        listener = new MultiPdpStatusListener(lst);
+        assertEquals(lst.toString(), listener.getUnseenPdpNames().toString());
+
+        // receive message from one PDP
+        status = new PdpStatus();
+        status.setName(NAME2);
+        listener.onTopicEvent(INFRA, TOPIC, status);
+        assertEquals(Arrays.asList(NAME1).toString(), listener.getUnseenPdpNames().toString());
+
+        // receive message from the other PDP
+        status = new PdpStatus();
+        status.setName(NAME1);
+        listener.onTopicEvent(INFRA, TOPIC, status);
+        assertTrue(listener.getUnseenPdpNames().isEmpty());
+    }
+
+    @Test
+    public void testAwait() throws Exception {
+        // try with an empty list - should already be complete
+        listener = new MultiPdpStatusListener(new LinkedList<>());
+        assertTrue(doWait(0));
+
+        // try it with something in the list
+        listener = new MultiPdpStatusListener(NAME_LIST);
+        assertFalse(doWait(0));
+
+        // process a message from one PDP - wait should block the entire time
+        status = new PdpStatus();
+        status.setName(NAME1);
+        listener.onTopicEvent(INFRA, TOPIC, status);
+        long tbeg = System.currentTimeMillis();
+        assertFalse(doWait(50));
+        assertTrue(System.currentTimeMillis() - tbeg >= 49);
+
+        // process a message from the other PDP - wait should NOT block
+        status = new PdpStatus();
+        status.setName(NAME2);
+        listener.onTopicEvent(INFRA, TOPIC, status);
+        tbeg = System.currentTimeMillis();
+        assertTrue(doWait(4000));
+        assertTrue(System.currentTimeMillis() - tbeg < 3000);
+    }
+
+    @Test
+    public void testOnTopicEvent() throws Exception {
+        listener = new MultiPdpStatusListener(NAME_LIST);
+
+        // not done yet
+        assertFalse(doWait(0));
+
+        // process a message - still not done as have another name to go
+        status = new PdpStatus();
+        status.setName(NAME1);
+        listener.onTopicEvent(INFRA, TOPIC, status);
+        assertFalse(doWait(0));
+
+        // process a message from the same PDP - still not done
+        status = new PdpStatus();
+        status.setName(NAME1);
+        listener.onTopicEvent(INFRA, TOPIC, status);
+        assertFalse(doWait(0));
+
+        // process another message - now we're done
+        status = new PdpStatus();
+        status.setName(NAME2);
+        listener.onTopicEvent(INFRA, TOPIC, status);
+        assertTrue(doWait(0));
+    }
+
+    /**
+     * Waits for the listener to complete. Spawns a background thread to do the waiting so
+     * we can limit how long we wait.
+     *
+     * @param millisec milliseconds to wait
+     * @return {@code true} if the wait completed successfully, {@code false} otherwise
+     * @throws InterruptedException if this thread is interrupted while waiting for the
+     *         background thread to complete
+     */
+    private boolean doWait(long millisec) throws InterruptedException {
+        AtomicBoolean done = new AtomicBoolean(false);
+
+        Thread thread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    done.set(listener.await(millisec, TimeUnit.MILLISECONDS));
+
+                } catch (InterruptedException expected) {
+                    return;
+                }
+            }
+        };
+
+        thread.start();
+        thread.join(5000);
+        thread.interrupt();
+
+        return done.get();
+    }
+}
index dcaba78..aa92766 100644 (file)
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.junit.Test;
 import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.pap.main.comm.PdpClientException;
 
 public class PdpClientExceptionTest {