2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.pap.main.comm;
23 import static org.assertj.core.api.Assertions.assertThatThrownBy;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
30 import java.io.FileInputStream;
31 import java.util.Properties;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.Semaphore;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.AfterClass;
37 import org.junit.Before;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
41 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
42 import org.onap.policy.common.endpoints.event.comm.TopicListener;
43 import org.onap.policy.common.utils.coder.Coder;
44 import org.onap.policy.common.utils.coder.CoderException;
45 import org.onap.policy.common.utils.coder.StandardCoder;
46 import org.onap.policy.common.utils.resources.ResourceUtils;
47 import org.onap.policy.models.pdp.concepts.PdpMessage;
48 import org.onap.policy.models.pdp.concepts.PdpStateChange;
49 import org.onap.policy.pap.main.PapConstants;
50 import org.onap.policy.pap.main.PolicyPapException;
52 public class PublisherTest extends Threaded {
54 // these messages will have different request IDs
55 private static final PdpStateChange MSG1 = new PdpStateChange();
56 private static final PdpStateChange MSG2 = new PdpStateChange();
58 // MSG1 & MSG2, respectively, encoded as JSON
59 private static final String JSON1;
60 private static final String JSON2;
64 Coder coder = new StandardCoder();
65 JSON1 = coder.encode(MSG1);
66 JSON2 = coder.encode(MSG2);
68 } catch (CoderException e) {
69 throw new ExceptionInInitializerError(e);
74 * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
77 private static final long MAX_WAIT_MS = 5000;
79 private Publisher pub;
80 private MyListener listener;
83 * Configures the topic and attaches a listener.
85 * @throws Exception if an error occurs
88 public static void setUpBeforeClass() throws Exception {
89 Properties props = new Properties();
90 File propFile = new File(ResourceUtils.getFilePath4Resource("parameters/topic.properties"));
91 try (FileInputStream inp = new FileInputStream(propFile)) {
95 TopicEndpoint.manager.shutdown();
97 TopicEndpoint.manager.addTopics(props);
98 TopicEndpoint.manager.start();
102 public static void tearDownAfterClass() throws Exception {
103 TopicEndpoint.manager.shutdown();
109 * @throws Exception if an error occurs
112 public void setUp() throws Exception {
115 pub = new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP);
117 listener = new MyListener();
118 TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).register(listener);
124 * @throws Exception if an error occurs
127 public void tearDown() throws Exception {
128 TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).unregister(listener);
134 protected void stopThread() {
141 public void testPublisher_testStop() throws Exception {
145 assertTrue(waitStop());
147 // ensure we can call "stop" a second time
152 public void testPublisher_Ex() throws Exception {
153 assertThatThrownBy(() -> new Publisher("unknwon-topic")).isInstanceOf(PolicyPapException.class);
157 public void testEnqueue() throws Exception {
158 // enqueue before running
159 pub.enqueue(new QueueToken<>(MSG1));
161 // enqueue another after running
163 pub.enqueue(new QueueToken<>(MSG2));
165 String json = listener.await(MAX_WAIT_MS);
166 assertEquals(JSON1, json);
168 json = listener.await(MAX_WAIT_MS);
169 assertEquals(JSON2, json);
173 public void testRun_StopBeforeProcess() throws Exception {
174 // enqueue before running
175 QueueToken<PdpMessage> token = new QueueToken<>(MSG1);
178 // stop before running
181 // start the thread and then wait for it to stop
183 assertTrue(waitStop());
185 // message should not have been processed
186 assertTrue(listener.isEmpty());
187 assertNotNull(token.get());
191 public void testRun() throws Exception {
194 // should skip token with null message
195 QueueToken<PdpMessage> token1 = new QueueToken<>(null);
198 QueueToken<PdpMessage> token2 = new QueueToken<>(MSG2);
201 // only the second message should have been processed
202 String json = listener.await(MAX_WAIT_MS);
203 assertEquals(JSON2, json);
204 assertNull(token2.get());
207 assertTrue(waitStop());
210 assertTrue(listener.isEmpty());
214 public void testGetNext() throws Exception {
217 // wait for a message to be processed
218 pub.enqueue(new QueueToken<>(MSG1));
219 assertNotNull(listener.await(MAX_WAIT_MS));
224 assertTrue(waitStop());
228 * Listener for messages published to the topic.
230 private static class MyListener implements TopicListener {
233 * Released every time a message is added to the queue.
235 private final Semaphore sem = new Semaphore(0);
237 private final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<>();
239 public boolean isEmpty() {
240 return messages.isEmpty();
244 * Waits for a message to be published to the topic.
246 * @param waitMs time to wait, in milli-seconds
247 * @return the next message in the queue, or {@code null} if there are no messages
248 * or if the timeout was reached
249 * @throws InterruptedException if this thread was interrupted while waiting
251 public String await(long waitMs) throws InterruptedException {
252 if (sem.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) {
253 return messages.poll();
260 public void onTopicEvent(CommInfrastructure commType, String topic, String event) {