Allow topics to be registered with lower case for kafka compability
[policy/pap.git] / main / src / test / java / org / onap / policy / pap / main / comm / PublisherTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP PAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
7  * Modifications Copyright (C) 2022-2023 Nordix Foundation.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.pap.main.comm;
24
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertNotNull;
28 import static org.junit.jupiter.api.Assertions.assertNull;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.jupiter.api.AfterAll;
35 import org.junit.jupiter.api.AfterEach;
36 import org.junit.jupiter.api.BeforeAll;
37 import org.junit.jupiter.api.BeforeEach;
38 import org.junit.jupiter.api.Test;
39 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
40 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
41 import org.onap.policy.common.endpoints.event.comm.TopicListener;
42 import org.onap.policy.common.utils.coder.Coder;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.models.pdp.concepts.PdpMessage;
46 import org.onap.policy.models.pdp.concepts.PdpStateChange;
47 import org.onap.policy.pap.main.PolicyPapException;
48 import org.onap.policy.pap.main.parameters.CommonTestData;
49 import org.onap.policy.pap.main.parameters.PapParameterGroup;
50
51 public class PublisherTest extends Threaded {
52
53     // these messages will have different request IDs
54     private static final PdpStateChange MSG1 = new PdpStateChange();
55     private static final PdpStateChange MSG2 = new PdpStateChange();
56
57     // MSG1 & MSG2, respectively, encoded as JSON
58     private static final String JSON1;
59     private static final String JSON2;
60
61     protected static final String PDP_PAP_TOPIC = "policy-pdp-pap";
62
63     static {
64         try {
65             Coder coder = new StandardCoder();
66             JSON1 = coder.encode(MSG1);
67             JSON2 = coder.encode(MSG2);
68
69         } catch (CoderException e) {
70             throw new ExceptionInInitializerError(e);
71         }
72     }
73
74     /**
75      * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
76      * published.
77      */
78     private static final long MAX_WAIT_MS = 5000;
79
80     private Publisher<PdpMessage> pub;
81     private MyListener listener;
82
83     /**
84      * Configures the topic and attaches a listener.
85      */
86     @BeforeAll
87     public static void setUpBeforeClass() {
88         final PapParameterGroup parameterGroup = new CommonTestData().getPapParameterGroup(6969);
89         TopicEndpointManager.getManager().shutdown();
90
91         TopicEndpointManager.getManager().addTopics(parameterGroup.getTopicParameterGroup());
92         TopicEndpointManager.getManager().start();
93     }
94
95     @AfterAll
96     public static void tearDownAfterClass() {
97         TopicEndpointManager.getManager().shutdown();
98     }
99
100     /**
101      * Set up.
102      *
103      * @throws Exception if an error occurs
104      */
105     @BeforeEach
106     public void setUp() throws Exception {
107         super.setUp();
108
109         pub = new Publisher<>(PDP_PAP_TOPIC);
110
111         listener = new MyListener();
112         TopicEndpointManager.getManager().getNoopTopicSink(PDP_PAP_TOPIC).register(listener);
113     }
114
115     /**
116      * Tear down.
117      *
118      * @throws Exception if an error occurs
119      */
120     @AfterEach
121     public void tearDown() throws Exception {
122         TopicEndpointManager.getManager().getNoopTopicSink(PDP_PAP_TOPIC).unregister(listener);
123
124         super.tearDown();
125     }
126
127     @Override
128     protected void stopThread() {
129         if (pub != null) {
130             pub.stop();
131         }
132     }
133
134     @Test
135     void testPublisher_testStop() throws Exception {
136         startThread(pub);
137         pub.stop();
138
139         assertTrue(waitStop());
140
141         // ensure we can call "stop" a second time
142         pub.stop();
143     }
144
145     @Test
146     void testPublisher_Ex() {
147         assertThatThrownBy(() -> new Publisher<>("unknwon-topic")).isInstanceOf(PolicyPapException.class);
148     }
149
150     @Test
151     void testEnqueue() throws Exception {
152         // enqueue before running
153         pub.enqueue(new QueueToken<>(MSG1));
154
155         // enqueue another after running
156         startThread(pub);
157         pub.enqueue(new QueueToken<>(MSG2));
158
159         String json = listener.await(MAX_WAIT_MS);
160         assertEquals(JSON1, json);
161
162         json = listener.await(MAX_WAIT_MS);
163         assertEquals(JSON2, json);
164     }
165
166     @Test
167     void testRun_StopBeforeProcess() throws Exception {
168         // enqueue before running
169         QueueToken<PdpMessage> token = new QueueToken<>(MSG1);
170         pub.enqueue(token);
171
172         // stop before running
173         pub.stop();
174
175         // start the thread and then wait for it to stop
176         startThread(pub);
177         assertTrue(waitStop());
178
179         // message should not have been processed
180         assertTrue(listener.isEmpty());
181         assertNotNull(token.get());
182     }
183
184     @Test
185     void testRun() throws Exception {
186         startThread(pub);
187
188         // should skip token with null message
189         QueueToken<PdpMessage> token1 = new QueueToken<>(null);
190         pub.enqueue(token1);
191
192         QueueToken<PdpMessage> token2 = new QueueToken<>(MSG2);
193         pub.enqueue(token2);
194
195         // only the second message should have been processed
196         String json = listener.await(MAX_WAIT_MS);
197         assertEquals(JSON2, json);
198         assertNull(token2.get());
199
200         pub.stop();
201         assertTrue(waitStop());
202
203         // no more messages
204         assertTrue(listener.isEmpty());
205     }
206
207     @Test
208     void testGetNext() throws Exception {
209         startThread(pub);
210
211         // wait for a message to be processed
212         pub.enqueue(new QueueToken<>(MSG1));
213         assertNotNull(listener.await(MAX_WAIT_MS));
214
215         // now interrupt
216         interruptThread();
217
218         assertTrue(waitStop());
219     }
220
221     /**
222      * Listener for messages published to the topic.
223      */
224     private static class MyListener implements TopicListener {
225
226         /**
227          * Released every time a message is added to the queue.
228          */
229         private final Semaphore sem = new Semaphore(0);
230
231         private final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<>();
232
233         public boolean isEmpty() {
234             return messages.isEmpty();
235         }
236
237         /**
238          * Waits for a message to be published to the topic.
239          *
240          * @param waitMs time to wait, in milliseconds
241          * @return the next message in the queue, or {@code null} if there are no messages/timeout was reached
242          * @throws InterruptedException if this thread was interrupted while waiting
243          */
244         public String await(long waitMs) throws InterruptedException {
245             if (sem.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) {
246                 return messages.poll();
247             }
248
249             return null;
250         }
251
252         @Override
253         public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
254             messages.add(event);
255             sem.release();
256         }
257     }
258 }