f15b2a0444bef52f19b2aac205986c60e053208d
[policy/pap.git] / main / src / test / java / org / onap / policy / pap / main / comm / PublisherTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP PAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.pap.main.comm;
22
23 import static org.assertj.core.api.Assertions.assertThatThrownBy;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28
29 import java.io.File;
30 import java.io.FileInputStream;
31 import java.util.Properties;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.Semaphore;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.AfterClass;
37 import org.junit.Before;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
41 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
42 import org.onap.policy.common.endpoints.event.comm.TopicListener;
43 import org.onap.policy.common.utils.coder.Coder;
44 import org.onap.policy.common.utils.coder.CoderException;
45 import org.onap.policy.common.utils.coder.StandardCoder;
46 import org.onap.policy.common.utils.resources.ResourceUtils;
47 import org.onap.policy.models.pdp.concepts.PdpMessage;
48 import org.onap.policy.models.pdp.concepts.PdpStateChange;
49 import org.onap.policy.pap.main.PapConstants;
50 import org.onap.policy.pap.main.PolicyPapException;
51
52 public class PublisherTest extends Threaded {
53
54     // these messages will have different request IDs
55     private static final PdpStateChange MSG1 = new PdpStateChange();
56     private static final PdpStateChange MSG2 = new PdpStateChange();
57
58     // MSG1 & MSG2, respectively, encoded as JSON
59     private static final String JSON1;
60     private static final String JSON2;
61
62     static {
63         try {
64             Coder coder = new StandardCoder();
65             JSON1 = coder.encode(MSG1);
66             JSON2 = coder.encode(MSG2);
67
68         } catch (CoderException e) {
69             throw new ExceptionInInitializerError(e);
70         }
71     }
72
73     /**
74      * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
75      * published.
76      */
77     private static final long MAX_WAIT_MS = 5000;
78
79     private Publisher pub;
80     private MyListener listener;
81
82     /**
83      * Configures the topic and attaches a listener.
84      *
85      * @throws Exception if an error occurs
86      */
87     @BeforeClass
88     public static void setUpBeforeClass() throws Exception {
89         Properties props = new Properties();
90         File propFile = new File(ResourceUtils.getFilePath4Resource("parameters/topic.properties"));
91         try (FileInputStream inp = new FileInputStream(propFile)) {
92             props.load(inp);
93         }
94
95         TopicEndpoint.manager.shutdown();
96
97         TopicEndpoint.manager.addTopics(props);
98         TopicEndpoint.manager.start();
99     }
100
101     @AfterClass
102     public static void tearDownAfterClass() throws Exception {
103         TopicEndpoint.manager.shutdown();
104     }
105
106     /**
107      * Set up.
108      *
109      * @throws Exception if an error occurs
110      */
111     @Before
112     public void setUp() throws Exception {
113         super.setUp();
114
115         pub = new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP);
116
117         listener = new MyListener();
118         TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).register(listener);
119     }
120
121     /**
122      * Tear down.
123      *
124      * @throws Exception if an error occurs
125      */
126     @After
127     public void tearDown() throws Exception {
128         TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).unregister(listener);
129
130         super.tearDown();
131     }
132
133     @Override
134     protected void stopThread() {
135         if (pub != null) {
136             pub.stop();
137         }
138     }
139
140     @Test
141     public void testPublisher_testStop() throws Exception {
142         startThread(pub);
143         pub.stop();
144
145         assertTrue(waitStop());
146
147         // ensure we can call "stop" a second time
148         pub.stop();
149     }
150
151     @Test
152     public void testPublisher_Ex() throws Exception {
153         assertThatThrownBy(() -> new Publisher("unknwon-topic")).isInstanceOf(PolicyPapException.class);
154     }
155
156     @Test
157     public void testEnqueue() throws Exception {
158         // enqueue before running
159         pub.enqueue(new QueueToken<>(MSG1));
160
161         // enqueue another after running
162         startThread(pub);
163         pub.enqueue(new QueueToken<>(MSG2));
164
165         String json = listener.await(MAX_WAIT_MS);
166         assertEquals(JSON1, json);
167
168         json = listener.await(MAX_WAIT_MS);
169         assertEquals(JSON2, json);
170     }
171
172     @Test
173     public void testRun_StopBeforeProcess() throws Exception {
174         // enqueue before running
175         QueueToken<PdpMessage> token = new QueueToken<>(MSG1);
176         pub.enqueue(token);
177
178         // stop before running
179         pub.stop();
180
181         // start the thread and then wait for it to stop
182         startThread(pub);
183         assertTrue(waitStop());
184
185         // message should not have been processed
186         assertTrue(listener.isEmpty());
187         assertNotNull(token.get());
188     }
189
190     @Test
191     public void testRun() throws Exception {
192         startThread(pub);
193
194         // should skip token with null message
195         QueueToken<PdpMessage> token1 = new QueueToken<>(null);
196         pub.enqueue(token1);
197
198         QueueToken<PdpMessage> token2 = new QueueToken<>(MSG2);
199         pub.enqueue(token2);
200
201         // only the second message should have been processed
202         String json = listener.await(MAX_WAIT_MS);
203         assertEquals(JSON2, json);
204         assertNull(token2.get());
205
206         pub.stop();
207         assertTrue(waitStop());
208
209         // no more messages
210         assertTrue(listener.isEmpty());
211     }
212
213     @Test
214     public void testGetNext() throws Exception {
215         startThread(pub);
216
217         // wait for a message to be processed
218         pub.enqueue(new QueueToken<>(MSG1));
219         assertNotNull(listener.await(MAX_WAIT_MS));
220
221         // now interrupt
222         interruptThread();
223
224         assertTrue(waitStop());
225     }
226
227     /**
228      * Listener for messages published to the topic.
229      */
230     private static class MyListener implements TopicListener {
231
232         /**
233          * Released every time a message is added to the queue.
234          */
235         private final Semaphore sem = new Semaphore(0);
236
237         private final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<>();
238
239         public boolean isEmpty() {
240             return messages.isEmpty();
241         }
242
243         /**
244          * Waits for a message to be published to the topic.
245          *
246          * @param waitMs time to wait, in milli-seconds
247          * @return the next message in the queue, or {@code null} if there are no messages
248          *         or if the timeout was reached
249          * @throws InterruptedException if this thread was interrupted while waiting
250          */
251         public String await(long waitMs) throws InterruptedException {
252             if (sem.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) {
253                 return messages.poll();
254             }
255
256             return null;
257         }
258
259         @Override
260         public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
261             messages.add(event);
262             sem.release();
263         }
264     }
265 }