c0c98adc4aacf0b7f6670da9774882dcc25b4350
[policy/pap.git] / main / src / test / java / org / onap / policy / pap / main / rest / e2e / End2EndContext.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP PAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2022-2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.pap.main.rest.e2e;
23
24 import static org.junit.jupiter.api.Assertions.assertFalse;
25 import static org.junit.jupiter.api.Assertions.assertNull;
26 import static org.junit.jupiter.api.Assertions.assertTrue;
27
28 import java.util.ArrayList;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import lombok.Getter;
36 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
37 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
38 import org.onap.policy.common.endpoints.event.comm.TopicListener;
39 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
40 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
41 import org.onap.policy.common.endpoints.listeners.ScoListener;
42 import org.onap.policy.common.utils.coder.Coder;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.common.utils.coder.StandardCoderObject;
46 import org.onap.policy.common.utils.services.Registry;
47 import org.onap.policy.models.pdp.concepts.PdpMessage;
48 import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
49 import org.onap.policy.models.pdp.concepts.PdpStateChange;
50 import org.onap.policy.models.pdp.concepts.PdpStatus;
51 import org.onap.policy.models.pdp.concepts.PdpUpdate;
52 import org.onap.policy.models.pdp.enums.PdpMessageType;
53 import org.onap.policy.pap.main.PapConstants;
54 import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * Context for end-to-end tests.
60  */
61 public class End2EndContext {
62     private static final Logger logger = LoggerFactory.getLogger(End2EndContext.class);
63
64     /**
65      * Message placed onto a queue to indicate that a PDP has nothing more to do.
66      */
67     private static final String DONE = "";
68
69     /**
70      * Time, in milliseconds, to wait for everything to complete.
71      */
72     private static final long WAIT_MS = 10000;
73
74     /**
75      * Messages to be sent to PAP. Messages are removed from the queue by the ToPapThread
76      * and directly handed off to the NOOP source.
77      */
78     private final BlockingQueue<String> toPap = new LinkedBlockingQueue<>();
79
80     /**
81      * Messages to be sent to the PDPs. Messages are removed from the queue by the
82      * ToPdpThread and are given to each PDP to handle.
83      */
84     private final BlockingQueue<String> toPdps = new LinkedBlockingQueue<>();
85
86     /**
87      * List of simulated PDPs.
88      */
89     @Getter
90     private final List<PseudoPdp> pdps = new ArrayList<>();
91
92     /**
93      * PAP's topic source.
94      */
95     private final NoopTopicSource toPapTopic;
96
97     /**
98      * Decodes messages read from the {@link #toPdps} queue and dispatches them to the
99      * appropriate handler.
100      */
101     private final MessageTypeDispatcher dispatcher;
102
103     /**
104      * Thread that passes messages to PAP.
105      */
106     private final ToPapThread toPapThread;
107
108     /**
109      * Thread that passes messages to PDPs.
110      */
111     private final ToPdpsThread toPdpsThread;
112
113     /**
114      * {@code True} if started, {@code false} if stopped.
115      */
116     private boolean running = false;
117
118     /**
119      * Exception thrown by a coder. Should be {@code null} if all is OK.
120      */
121     private volatile CoderException exception = null;
122
123     /**
124      * Listener for messages written to the PDP-PAP topic.
125      */
126     private TopicListener topicListener = (infra, topic, text) -> toPdps.add(text);
127
128     private String topicPolicyPdpPap = "pdp-pap-topic";
129
130     /**
131      * Constructs the object.
132      */
133     public End2EndContext() {
134         toPapTopic = TopicEndpointManager.getManager().getNoopTopicSource(topicPolicyPdpPap);
135
136         TopicEndpointManager.getManager().getNoopTopicSink(topicPolicyPdpPap).register(topicListener);
137
138         dispatcher = new MessageTypeDispatcher("messageName");
139         dispatcher.register(PdpMessageType.PDP_UPDATE.name(), new UpdateListener());
140         dispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), new ChangeListener());
141
142         toPapThread = new ToPapThread();
143         toPdpsThread = new ToPdpsThread();
144     }
145
146     /**
147      * Starts the threads that read the "DMaaP" queues..
148      */
149     public void startThreads() {
150         if (running) {
151             throw new IllegalStateException("already running");
152         }
153
154         for (Thread thread : new Thread[] {toPapThread, toPdpsThread}) {
155             thread.setDaemon(true);
156             thread.start();
157         }
158
159         running = true;
160     }
161
162     /**
163      * Waits for the threads to shut down.
164      *
165      * @throws InterruptedException if interrupted while waiting
166      */
167     public void await() throws InterruptedException {
168         toPapThread.join(WAIT_MS);
169         assertFalse(toPapThread.isAlive());
170
171         PdpModifyRequestMap map = Registry.get(PapConstants.REG_PDP_MODIFY_MAP);
172         assertTrue(map.isEmpty());
173
174         // no more requests, thus we can tell the other thread to stop
175         toPdps.add(DONE);
176
177         toPdpsThread.join(WAIT_MS);
178         assertFalse(toPapThread.isAlive());
179
180         // nothing new should have been added to the PAP queue
181         assertTrue(toPap.isEmpty());
182
183         assertNull(exception);
184     }
185
186     /**
187      * Stops the threads and shuts down the PAP Activator, rest services, and topic end
188      * points.
189      */
190     public void stop() {
191         if (!running) {
192             throw new IllegalStateException("not running");
193         }
194
195         running = false;
196
197         // queue up a "done" message for each PDP
198         toPdps.clear();
199         pdps.forEach(pdp -> toPdps.add(DONE));
200
201         // queue up a "done" message for each PDP
202         toPap.clear();
203         pdps.forEach(pdp -> toPap.add(DONE));
204
205         TopicEndpointManager.getManager().getNoopTopicSink(topicPolicyPdpPap).unregister(topicListener);
206     }
207
208     /**
209      * Adds a simulated PDP. This must be called before {@link #startThreads()} is
210      * invoked.
211      *
212      * @param pdpName PDP name
213      * @param pdpType PDP type
214      * @return a new, simulated PDP
215      * @throws IllegalStateException if {@link #startThreads()} has already been invoked
216      */
217     public PseudoPdp addPdp(String pdpName, String pdpType) {
218         if (running) {
219             throw new IllegalStateException("not running");
220         }
221
222         PseudoPdp pdp = new PseudoPdp(pdpName);
223         pdps.add(pdp);
224
225         return pdp;
226     }
227
228     /**
229      * Thread that reads messages from the {@link End2EndContext#toPdps} queue and
230      * dispatches them to each PDP. This thread terminates as soon as it sees a
231      * {@link End2EndContext#DONE} message.
232      */
233     private class ToPdpsThread extends Thread {
234         @Override
235         public void run() {
236             for (;;) {
237                 String text;
238                 try {
239                     text = toPdps.take();
240                 } catch (InterruptedException e) {
241                     logger.warn("{} interrupted", ToPdpsThread.class.getName(), e);
242                     Thread.currentThread().interrupt();
243                     break;
244                 }
245
246                 if (DONE.equals(text)) {
247                     break;
248                 }
249
250                 dispatcher.onTopicEvent(CommInfrastructure.NOOP, topicPolicyPdpPap, text);
251             }
252         }
253     }
254
255     /**
256      * Thread that reads messages from the {@link End2EndContext#toPap} queue and passes
257      * them to the PAP's topic source. This thread terminates once it sees a
258      * {@link End2EndContext#DONE} message <i>for each PDP</i>.
259      */
260     private class ToPapThread extends Thread {
261
262         @Override
263         public void run() {
264             // pretend we received DONE from PDPs that are already finished
265             long ndone = pdps.stream().filter(pdp -> pdp.finished).count();
266
267             while (ndone < pdps.size()) {
268                 String text;
269                 try {
270                     text = toPap.take();
271                 } catch (InterruptedException e) {
272                     logger.warn("{} interrupted", ToPapThread.class.getName(), e);
273                     Thread.currentThread().interrupt();
274                     break;
275                 }
276
277                 if (DONE.equals(text)) {
278                     ++ndone;
279
280                 } else {
281                     toPapTopic.offer(text);
282                 }
283             }
284         }
285     }
286
287     /**
288      * Listener for PdpUpdate messages received from PAP. Invokes
289      * {@link PseudoPdp#handle(PdpUpdate)} for each PDP.
290      */
291     private class UpdateListener extends ScoListener<PdpUpdate> {
292         public UpdateListener() {
293             super(PdpUpdate.class);
294         }
295
296         @Override
297         public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpUpdate update) {
298             pdps.forEach(pdp -> pdp.handle(update));
299         }
300     }
301
302     /**
303      * Listener for PdpStateChange messages received from PAP. Invokes
304      * {@link PseudoPdp#handle(PdpStateChange)} for each PDP.
305      */
306     private class ChangeListener extends ScoListener<PdpStateChange> {
307         public ChangeListener() {
308             super(PdpStateChange.class);
309         }
310
311         @Override
312         public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco,
313                         PdpStateChange change) {
314             pdps.forEach(pdp -> pdp.handle(change));
315         }
316     }
317
318     /**
319      * Simulated PDP. Each PDP handles messages from the PAP and can return replies in
320      * response to those messages. The replies must be queued up before
321      * {@link End2EndContext#startThreads()} is invoked.
322      */
323     public class PseudoPdp {
324         private final String name;
325
326         private final Coder coder = new StandardCoder();
327         private final Queue<PdpStatus> replies = new LinkedList<>();
328
329         /**
330          * Messages that this PDP has handled.
331          */
332         @Getter
333         private final Queue<PdpMessage> handled = new ConcurrentLinkedQueue<>();
334
335         private volatile String group = null;
336         private volatile String subgroup = null;
337
338         private volatile boolean finished = true;
339
340         /**
341          * Constructs the object.
342          *
343          * @param name PDP name
344          */
345         private PseudoPdp(String name) {
346             this.name = name;
347         }
348
349         public PseudoPdp setGroup(String group) {
350             this.group = group;
351             return this;
352         }
353
354         public PseudoPdp setSubgroup(String subgroup) {
355             this.subgroup = subgroup;
356             return this;
357         }
358
359         /**
360          * Adds a reply to the list of replies that will be returned in response to
361          * messages from the PAP.
362          *
363          * @param reply reply to be added to the list
364          * @return this PDP
365          */
366         public PseudoPdp addReply(PdpStatus reply) {
367             replies.add(reply);
368             finished = false;
369             return this;
370         }
371
372         /**
373          * Handles an UPDATE message, recording the information extracted from the message
374          * and queuing up a reply, if any.
375          *
376          * @param message message that was received from PAP
377          */
378         private void handle(PdpUpdate message) {
379             if (message.appliesTo(name, group, subgroup)) {
380                 handled.add(message);
381                 group = message.getPdpGroup();
382                 subgroup = message.getPdpSubgroup();
383                 reply(message);
384             }
385         }
386
387         /**
388          * Handles a STAT-CHANGE message. Queues up a reply, if any.
389          *
390          * @param message message that was received from PAP
391          */
392         private void handle(PdpStateChange message) {
393             if (message.appliesTo(name, group, subgroup)) {
394                 handled.add(message);
395                 reply(message);
396             }
397         }
398
399         /**
400          * Queues up the next reply. If there are no more replies, then it queues up a
401          * {@link End2EndContext#DONE} message.
402          *
403          * @param message the message to which a reply should be sent
404          */
405         private void reply(PdpMessage message) {
406             PdpStatus status = replies.poll();
407             if (status == null) {
408                 return;
409             }
410
411             PdpResponseDetails response = new PdpResponseDetails();
412             response.setResponseTo(message.getRequestId());
413             status.setResponse(response);
414
415             toPap.add(toJson(status));
416
417             if (replies.isEmpty()) {
418                 finished = true;
419                 toPap.add(DONE);
420             }
421         }
422
423         /**
424          * Converts a message to JSON.
425          *
426          * @param status message to be converted
427          * @return JSON representation of the message
428          */
429         private String toJson(PdpStatus status) {
430             try {
431                 return coder.encode(status);
432
433             } catch (CoderException e) {
434                 exception = e;
435                 return DONE;
436             }
437         }
438
439         @Override
440         public String toString() {
441             return name;
442         }
443     }
444 }