Receive PDP status messages
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / MultiPdpStatusListener.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP PAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.pap.main.comm;
22
23 import java.util.Collection;
24 import java.util.Set;
25 import java.util.SortedSet;
26 import java.util.TreeSet;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
32 import org.onap.policy.pdp.common.models.PdpStatus;
33
34 /**
35  * Listener for PDP Status messages expected to be received from multiple PDPs. The
36  * listener "completes" once a message has been seen from all of the PDPs.
37  */
38 public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> {
39
40     /**
41      * This is decremented once a message has been received from every PDP.
42      */
43     private final CountDownLatch allSeen = new CountDownLatch(1);
44
45     /**
46      * PDPs from which no message has been received yet.
47      */
48     private final Set<String> unseenPdpNames = ConcurrentHashMap.newKeySet();
49
50     /**
51      * Constructs the object.
52      *
53      * @param pdpName name of the PDP for which to wait
54      */
55     public MultiPdpStatusListener(String pdpName) {
56         unseenPdpNames.add(pdpName);
57     }
58
59     /**
60      * Constructs the object.
61      *
62      * @param pdpNames names of the PDP for which to wait
63      */
64     public MultiPdpStatusListener(Collection<String> pdpNames) {
65         if (pdpNames.isEmpty()) {
66             allSeen.countDown();
67
68         } else {
69             unseenPdpNames.addAll(pdpNames);
70         }
71     }
72
73     /**
74      * Gets the set of names for which messages have not yet been received.
75      *
76      * @return the names of the PDPs that have not been seen yet
77      */
78     public SortedSet<String> getUnseenPdpNames() {
79         return new TreeSet<>(unseenPdpNames);
80     }
81
82     /**
83      * Waits for messages to be received for all PDPs, or until a timeout is reached.
84      *
85      * @param timeout the maximum time to wait
86      * @param unit the time unit of the timeout argument
87      * @return {@code true} if messages were received for all PDPs, {@code false} if the
88      *         timeout was reached first
89      * @throws InterruptedException if the current thread is interrupted while waiting
90      */
91     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
92         return allSeen.await(timeout, unit);
93     }
94
95     /**
96      * Indicates that a message was received for a PDP. Triggers completion of
97      * {@link #await(long, TimeUnit)} if all PDPs have received a message. Threads may
98      * override this method to process a message. However, they should still invoke this
99      * method so that PDPs can be properly tracked.
100      */
101     @Override
102     public void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) {
103         unseenPdpNames.remove(message.getName());
104
105         if (unseenPdpNames.isEmpty()) {
106             allSeen.countDown();
107         }
108     }
109 }