c33790aa362802a09fd8da6804ff592b119f83df
[policy/pap.git] / main / src / test / java / org / onap / policy / pap / main / comm / PublisherTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP PAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
7  * Modifications Copyright (C) 2022 Nordix Foundation.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.pap.main.comm;
24
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertTrue;
30
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.AfterClass;
36 import org.junit.Before;
37 import org.junit.BeforeClass;
38 import org.junit.Test;
39 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
40 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
41 import org.onap.policy.common.endpoints.event.comm.TopicListener;
42 import org.onap.policy.common.utils.coder.Coder;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.models.pdp.concepts.PdpMessage;
46 import org.onap.policy.models.pdp.concepts.PdpStateChange;
47 import org.onap.policy.pap.main.PolicyPapException;
48 import org.onap.policy.pap.main.parameters.CommonTestData;
49 import org.onap.policy.pap.main.parameters.PapParameterGroup;
50
51 public class PublisherTest extends Threaded {
52
53     // these messages will have different request IDs
54     private static final PdpStateChange MSG1 = new PdpStateChange();
55     private static final PdpStateChange MSG2 = new PdpStateChange();
56
57     // MSG1 & MSG2, respectively, encoded as JSON
58     private static final String JSON1;
59     private static final String JSON2;
60
61     protected static final String PDP_PAP_TOPIC = "POLICY-PDP-PAP";
62
63     static {
64         try {
65             Coder coder = new StandardCoder();
66             JSON1 = coder.encode(MSG1);
67             JSON2 = coder.encode(MSG2);
68
69         } catch (CoderException e) {
70             throw new ExceptionInInitializerError(e);
71         }
72     }
73
74     /**
75      * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
76      * published.
77      */
78     private static final long MAX_WAIT_MS = 5000;
79
80     private Publisher<PdpMessage> pub;
81     private MyListener listener;
82
83     /**
84      * Configures the topic and attaches a listener.
85      *
86      */
87     @BeforeClass
88     public static void setUpBeforeClass() {
89         final PapParameterGroup parameterGroup = new CommonTestData().getPapParameterGroup(6969);
90         TopicEndpointManager.getManager().shutdown();
91
92         TopicEndpointManager.getManager().addTopics(parameterGroup.getTopicParameterGroup());
93         TopicEndpointManager.getManager().start();
94     }
95
96     @AfterClass
97     public static void tearDownAfterClass() {
98         TopicEndpointManager.getManager().shutdown();
99     }
100
101     /**
102      * Set up.
103      *
104      * @throws Exception if an error occurs
105      */
106     @Before
107     public void setUp() throws Exception {
108         super.setUp();
109
110         pub = new Publisher<>(PDP_PAP_TOPIC);
111
112         listener = new MyListener();
113         TopicEndpointManager.getManager().getNoopTopicSink(PDP_PAP_TOPIC).register(listener);
114     }
115
116     /**
117      * Tear down.
118      *
119      * @throws Exception if an error occurs
120      */
121     @After
122     public void tearDown() throws Exception {
123         TopicEndpointManager.getManager().getNoopTopicSink(PDP_PAP_TOPIC).unregister(listener);
124
125         super.tearDown();
126     }
127
128     @Override
129     protected void stopThread() {
130         if (pub != null) {
131             pub.stop();
132         }
133     }
134
135     @Test
136     public void testPublisher_testStop() throws Exception {
137         startThread(pub);
138         pub.stop();
139
140         assertTrue(waitStop());
141
142         // ensure we can call "stop" a second time
143         pub.stop();
144     }
145
146     @Test
147     public void testPublisher_Ex() {
148         assertThatThrownBy(() -> new Publisher<>("unknwon-topic")).isInstanceOf(PolicyPapException.class);
149     }
150
151     @Test
152     public void testEnqueue() throws Exception {
153         // enqueue before running
154         pub.enqueue(new QueueToken<>(MSG1));
155
156         // enqueue another after running
157         startThread(pub);
158         pub.enqueue(new QueueToken<>(MSG2));
159
160         String json = listener.await(MAX_WAIT_MS);
161         assertEquals(JSON1, json);
162
163         json = listener.await(MAX_WAIT_MS);
164         assertEquals(JSON2, json);
165     }
166
167     @Test
168     public void testRun_StopBeforeProcess() throws Exception {
169         // enqueue before running
170         QueueToken<PdpMessage> token = new QueueToken<>(MSG1);
171         pub.enqueue(token);
172
173         // stop before running
174         pub.stop();
175
176         // start the thread and then wait for it to stop
177         startThread(pub);
178         assertTrue(waitStop());
179
180         // message should not have been processed
181         assertTrue(listener.isEmpty());
182         assertNotNull(token.get());
183     }
184
185     @Test
186     public void testRun() throws Exception {
187         startThread(pub);
188
189         // should skip token with null message
190         QueueToken<PdpMessage> token1 = new QueueToken<>(null);
191         pub.enqueue(token1);
192
193         QueueToken<PdpMessage> token2 = new QueueToken<>(MSG2);
194         pub.enqueue(token2);
195
196         // only the second message should have been processed
197         String json = listener.await(MAX_WAIT_MS);
198         assertEquals(JSON2, json);
199         assertNull(token2.get());
200
201         pub.stop();
202         assertTrue(waitStop());
203
204         // no more messages
205         assertTrue(listener.isEmpty());
206     }
207
208     @Test
209     public void testGetNext() throws Exception {
210         startThread(pub);
211
212         // wait for a message to be processed
213         pub.enqueue(new QueueToken<>(MSG1));
214         assertNotNull(listener.await(MAX_WAIT_MS));
215
216         // now interrupt
217         interruptThread();
218
219         assertTrue(waitStop());
220     }
221
222     /**
223      * Listener for messages published to the topic.
224      */
225     private static class MyListener implements TopicListener {
226
227         /**
228          * Released every time a message is added to the queue.
229          */
230         private final Semaphore sem = new Semaphore(0);
231
232         private final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<>();
233
234         public boolean isEmpty() {
235             return messages.isEmpty();
236         }
237
238         /**
239          * Waits for a message to be published to the topic.
240          *
241          * @param waitMs time to wait, in milli-seconds
242          * @return the next message in the queue, or {@code null} if there are no messages
243          *         or if the timeout was reached
244          * @throws InterruptedException if this thread was interrupted while waiting
245          */
246         public String await(long waitMs) throws InterruptedException {
247             if (sem.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) {
248                 return messages.poll();
249             }
250
251             return null;
252         }
253
254         @Override
255         public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
256             messages.add(event);
257             sem.release();
258         }
259     }
260 }