2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
7 * Modifications Copyright (C) 2022-2023 Nordix Foundation.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.pap.main.comm;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertNotNull;
28 import static org.junit.jupiter.api.Assertions.assertNull;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.jupiter.api.AfterAll;
35 import org.junit.jupiter.api.AfterEach;
36 import org.junit.jupiter.api.BeforeAll;
37 import org.junit.jupiter.api.BeforeEach;
38 import org.junit.jupiter.api.Test;
39 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
40 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
41 import org.onap.policy.common.endpoints.event.comm.TopicListener;
42 import org.onap.policy.common.utils.coder.Coder;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.models.pdp.concepts.PdpMessage;
46 import org.onap.policy.models.pdp.concepts.PdpStateChange;
47 import org.onap.policy.pap.main.PolicyPapException;
48 import org.onap.policy.pap.main.parameters.CommonTestData;
49 import org.onap.policy.pap.main.parameters.PapParameterGroup;
51 public class PublisherTest extends Threaded {
53 // these messages will have different request IDs
54 private static final PdpStateChange MSG1 = new PdpStateChange();
55 private static final PdpStateChange MSG2 = new PdpStateChange();
57 // MSG1 & MSG2, respectively, encoded as JSON
58 private static final String JSON1;
59 private static final String JSON2;
61 protected static final String PDP_PAP_TOPIC = "POLICY-PDP-PAP";
65 Coder coder = new StandardCoder();
66 JSON1 = coder.encode(MSG1);
67 JSON2 = coder.encode(MSG2);
69 } catch (CoderException e) {
70 throw new ExceptionInInitializerError(e);
75 * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
78 private static final long MAX_WAIT_MS = 5000;
80 private Publisher<PdpMessage> pub;
81 private MyListener listener;
84 * Configures the topic and attaches a listener.
87 public static void setUpBeforeClass() {
88 final PapParameterGroup parameterGroup = new CommonTestData().getPapParameterGroup(6969);
89 TopicEndpointManager.getManager().shutdown();
91 TopicEndpointManager.getManager().addTopics(parameterGroup.getTopicParameterGroup());
92 TopicEndpointManager.getManager().start();
96 public static void tearDownAfterClass() {
97 TopicEndpointManager.getManager().shutdown();
103 * @throws Exception if an error occurs
106 public void setUp() throws Exception {
109 pub = new Publisher<>(PDP_PAP_TOPIC);
111 listener = new MyListener();
112 TopicEndpointManager.getManager().getNoopTopicSink(PDP_PAP_TOPIC).register(listener);
118 * @throws Exception if an error occurs
121 public void tearDown() throws Exception {
122 TopicEndpointManager.getManager().getNoopTopicSink(PDP_PAP_TOPIC).unregister(listener);
128 protected void stopThread() {
135 void testPublisher_testStop() throws Exception {
139 assertTrue(waitStop());
141 // ensure we can call "stop" a second time
146 void testPublisher_Ex() {
147 assertThatThrownBy(() -> new Publisher<>("unknwon-topic")).isInstanceOf(PolicyPapException.class);
151 void testEnqueue() throws Exception {
152 // enqueue before running
153 pub.enqueue(new QueueToken<>(MSG1));
155 // enqueue another after running
157 pub.enqueue(new QueueToken<>(MSG2));
159 String json = listener.await(MAX_WAIT_MS);
160 assertEquals(JSON1, json);
162 json = listener.await(MAX_WAIT_MS);
163 assertEquals(JSON2, json);
167 void testRun_StopBeforeProcess() throws Exception {
168 // enqueue before running
169 QueueToken<PdpMessage> token = new QueueToken<>(MSG1);
172 // stop before running
175 // start the thread and then wait for it to stop
177 assertTrue(waitStop());
179 // message should not have been processed
180 assertTrue(listener.isEmpty());
181 assertNotNull(token.get());
185 void testRun() throws Exception {
188 // should skip token with null message
189 QueueToken<PdpMessage> token1 = new QueueToken<>(null);
192 QueueToken<PdpMessage> token2 = new QueueToken<>(MSG2);
195 // only the second message should have been processed
196 String json = listener.await(MAX_WAIT_MS);
197 assertEquals(JSON2, json);
198 assertNull(token2.get());
201 assertTrue(waitStop());
204 assertTrue(listener.isEmpty());
208 void testGetNext() throws Exception {
211 // wait for a message to be processed
212 pub.enqueue(new QueueToken<>(MSG1));
213 assertNotNull(listener.await(MAX_WAIT_MS));
218 assertTrue(waitStop());
222 * Listener for messages published to the topic.
224 private static class MyListener implements TopicListener {
227 * Released every time a message is added to the queue.
229 private final Semaphore sem = new Semaphore(0);
231 private final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<>();
233 public boolean isEmpty() {
234 return messages.isEmpty();
238 * Waits for a message to be published to the topic.
240 * @param waitMs time to wait, in milliseconds
241 * @return the next message in the queue, or {@code null} if there are no messages/timeout was reached
242 * @throws InterruptedException if this thread was interrupted while waiting
244 public String await(long waitMs) throws InterruptedException {
245 if (sem.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) {
246 return messages.poll();
253 public void onTopicEvent(CommInfrastructure commType, String topic, String event) {