2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.pap.main.comm;
23 import java.util.Collection;
25 import java.util.SortedSet;
26 import java.util.TreeSet;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
32 import org.onap.policy.pdp.common.models.PdpStatus;
35 * Listener for PDP Status messages expected to be received from multiple PDPs. The
36 * listener "completes" once a message has been seen from all of the PDPs.
38 public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> {
41 * This is decremented once a message has been received from every PDP.
43 private final CountDownLatch allSeen = new CountDownLatch(1);
46 * PDPs from which no message has been received yet.
48 private final Set<String> unseenPdpNames = ConcurrentHashMap.newKeySet();
51 * Constructs the object.
53 * @param pdpName name of the PDP for which to wait
55 public MultiPdpStatusListener(String pdpName) {
56 unseenPdpNames.add(pdpName);
60 * Constructs the object.
62 * @param pdpNames names of the PDP for which to wait
64 public MultiPdpStatusListener(Collection<String> pdpNames) {
65 if (pdpNames.isEmpty()) {
69 unseenPdpNames.addAll(pdpNames);
74 * Gets the set of names for which messages have not yet been received.
76 * @return the names of the PDPs that have not been seen yet
78 public SortedSet<String> getUnseenPdpNames() {
79 return new TreeSet<>(unseenPdpNames);
83 * Waits for messages to be received for all PDPs, or until a timeout is reached.
85 * @param timeout the maximum time to wait
86 * @param unit the time unit of the timeout argument
87 * @return {@code true} if messages were received for all PDPs, {@code false} if the
88 * timeout was reached first
89 * @throws InterruptedException if the current thread is interrupted while waiting
91 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
92 return allSeen.await(timeout, unit);
96 * Indicates that a message was received for a PDP. Triggers completion of
97 * {@link #await(long, TimeUnit)} if all PDPs have received a message. Threads may
98 * override this method to process a message. However, they should still invoke this
99 * method so that PDPs can be properly tracked.
102 public void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) {
103 unseenPdpNames.remove(message.getName());
105 if (unseenPdpNames.isEmpty()) {