2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2022-2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.pap.main.rest.e2e;
24 import static org.junit.jupiter.api.Assertions.assertFalse;
25 import static org.junit.jupiter.api.Assertions.assertNull;
26 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import java.util.ArrayList;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.LinkedBlockingQueue;
36 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
37 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
38 import org.onap.policy.common.endpoints.event.comm.TopicListener;
39 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
40 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
41 import org.onap.policy.common.endpoints.listeners.ScoListener;
42 import org.onap.policy.common.utils.coder.Coder;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.common.utils.coder.StandardCoderObject;
46 import org.onap.policy.common.utils.services.Registry;
47 import org.onap.policy.models.pdp.concepts.PdpMessage;
48 import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
49 import org.onap.policy.models.pdp.concepts.PdpStateChange;
50 import org.onap.policy.models.pdp.concepts.PdpStatus;
51 import org.onap.policy.models.pdp.concepts.PdpUpdate;
52 import org.onap.policy.models.pdp.enums.PdpMessageType;
53 import org.onap.policy.pap.main.PapConstants;
54 import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * Context for end-to-end tests.
61 public class End2EndContext {
62 private static final Logger logger = LoggerFactory.getLogger(End2EndContext.class);
65 * Message placed onto a queue to indicate that a PDP has nothing more to do.
67 private static final String DONE = "";
70 * Time, in milliseconds, to wait for everything to complete.
72 private static final long WAIT_MS = 10000;
75 * Messages to be sent to PAP. Messages are removed from the queue by the ToPapThread
76 * and directly handed off to the NOOP source.
78 private final BlockingQueue<String> toPap = new LinkedBlockingQueue<>();
81 * Messages to be sent to the PDPs. Messages are removed from the queue by the
82 * ToPdpThread and are given to each PDP to handle.
84 private final BlockingQueue<String> toPdps = new LinkedBlockingQueue<>();
87 * List of simulated PDPs.
90 private final List<PseudoPdp> pdps = new ArrayList<>();
95 private final NoopTopicSource toPapTopic;
98 * Decodes messages read from the {@link #toPdps} queue and dispatches them to the
99 * appropriate handler.
101 private final MessageTypeDispatcher dispatcher;
104 * Thread that passes messages to PAP.
106 private final ToPapThread toPapThread;
109 * Thread that passes messages to PDPs.
111 private final ToPdpsThread toPdpsThread;
114 * {@code True} if started, {@code false} if stopped.
116 private boolean running = false;
119 * Exception thrown by a coder. Should be {@code null} if all is OK.
121 private volatile CoderException exception = null;
124 * Listener for messages written to the PDP-PAP topic.
126 private TopicListener topicListener = (infra, topic, text) -> toPdps.add(text);
128 private String topicPolicyPdpPap = "pdp-pap-topic";
131 * Constructs the object.
133 public End2EndContext() {
134 toPapTopic = TopicEndpointManager.getManager().getNoopTopicSource(topicPolicyPdpPap);
136 TopicEndpointManager.getManager().getNoopTopicSink(topicPolicyPdpPap).register(topicListener);
138 dispatcher = new MessageTypeDispatcher("messageName");
139 dispatcher.register(PdpMessageType.PDP_UPDATE.name(), new UpdateListener());
140 dispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), new ChangeListener());
142 toPapThread = new ToPapThread();
143 toPdpsThread = new ToPdpsThread();
147 * Starts the threads that read the "DMaaP" queues..
149 public void startThreads() {
151 throw new IllegalStateException("already running");
154 for (Thread thread : new Thread[] {toPapThread, toPdpsThread}) {
155 thread.setDaemon(true);
163 * Waits for the threads to shut down.
165 * @throws InterruptedException if interrupted while waiting
167 public void await() throws InterruptedException {
168 toPapThread.join(WAIT_MS);
169 assertFalse(toPapThread.isAlive());
171 PdpModifyRequestMap map = Registry.get(PapConstants.REG_PDP_MODIFY_MAP);
172 assertTrue(map.isEmpty());
174 // no more requests, thus we can tell the other thread to stop
177 toPdpsThread.join(WAIT_MS);
178 assertFalse(toPapThread.isAlive());
180 // nothing new should have been added to the PAP queue
181 assertTrue(toPap.isEmpty());
183 assertNull(exception);
187 * Stops the threads and shuts down the PAP Activator, rest services, and topic end
192 throw new IllegalStateException("not running");
197 // queue up a "done" message for each PDP
199 pdps.forEach(pdp -> toPdps.add(DONE));
201 // queue up a "done" message for each PDP
203 pdps.forEach(pdp -> toPap.add(DONE));
205 TopicEndpointManager.getManager().getNoopTopicSink(topicPolicyPdpPap).unregister(topicListener);
209 * Adds a simulated PDP. This must be called before {@link #startThreads()} is
212 * @param pdpName PDP name
213 * @param pdpType PDP type
214 * @return a new, simulated PDP
215 * @throws IllegalStateException if {@link #startThreads()} has already been invoked
217 public PseudoPdp addPdp(String pdpName, String pdpType) {
219 throw new IllegalStateException("not running");
222 PseudoPdp pdp = new PseudoPdp(pdpName);
229 * Thread that reads messages from the {@link End2EndContext#toPdps} queue and
230 * dispatches them to each PDP. This thread terminates as soon as it sees a
231 * {@link End2EndContext#DONE} message.
233 private class ToPdpsThread extends Thread {
239 text = toPdps.take();
240 } catch (InterruptedException e) {
241 logger.warn("{} interrupted", ToPdpsThread.class.getName(), e);
242 Thread.currentThread().interrupt();
246 if (DONE.equals(text)) {
250 dispatcher.onTopicEvent(CommInfrastructure.NOOP, topicPolicyPdpPap, text);
256 * Thread that reads messages from the {@link End2EndContext#toPap} queue and passes
257 * them to the PAP's topic source. This thread terminates once it sees a
258 * {@link End2EndContext#DONE} message <i>for each PDP</i>.
260 private class ToPapThread extends Thread {
264 // pretend we received DONE from PDPs that are already finished
265 long ndone = pdps.stream().filter(pdp -> pdp.finished).count();
267 while (ndone < pdps.size()) {
271 } catch (InterruptedException e) {
272 logger.warn("{} interrupted", ToPapThread.class.getName(), e);
273 Thread.currentThread().interrupt();
277 if (DONE.equals(text)) {
281 toPapTopic.offer(text);
288 * Listener for PdpUpdate messages received from PAP. Invokes
289 * {@link PseudoPdp#handle(PdpUpdate)} for each PDP.
291 private class UpdateListener extends ScoListener<PdpUpdate> {
292 public UpdateListener() {
293 super(PdpUpdate.class);
297 public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpUpdate update) {
298 pdps.forEach(pdp -> pdp.handle(update));
303 * Listener for PdpStateChange messages received from PAP. Invokes
304 * {@link PseudoPdp#handle(PdpStateChange)} for each PDP.
306 private class ChangeListener extends ScoListener<PdpStateChange> {
307 public ChangeListener() {
308 super(PdpStateChange.class);
312 public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco,
313 PdpStateChange change) {
314 pdps.forEach(pdp -> pdp.handle(change));
319 * Simulated PDP. Each PDP handles messages from the PAP and can return replies in
320 * response to those messages. The replies must be queued up before
321 * {@link End2EndContext#startThreads()} is invoked.
323 public class PseudoPdp {
324 private final String name;
326 private final Coder coder = new StandardCoder();
327 private final Queue<PdpStatus> replies = new LinkedList<>();
330 * Messages that this PDP has handled.
333 private final Queue<PdpMessage> handled = new ConcurrentLinkedQueue<>();
335 private volatile String group = null;
336 private volatile String subgroup = null;
338 private volatile boolean finished = true;
341 * Constructs the object.
343 * @param name PDP name
345 private PseudoPdp(String name) {
349 public PseudoPdp setGroup(String group) {
354 public PseudoPdp setSubgroup(String subgroup) {
355 this.subgroup = subgroup;
360 * Adds a reply to the list of replies that will be returned in response to
361 * messages from the PAP.
363 * @param reply reply to be added to the list
366 public PseudoPdp addReply(PdpStatus reply) {
373 * Handles an UPDATE message, recording the information extracted from the message
374 * and queuing up a reply, if any.
376 * @param message message that was received from PAP
378 private void handle(PdpUpdate message) {
379 if (message.appliesTo(name, group, subgroup)) {
380 handled.add(message);
381 group = message.getPdpGroup();
382 subgroup = message.getPdpSubgroup();
388 * Handles a STAT-CHANGE message. Queues up a reply, if any.
390 * @param message message that was received from PAP
392 private void handle(PdpStateChange message) {
393 if (message.appliesTo(name, group, subgroup)) {
394 handled.add(message);
400 * Queues up the next reply. If there are no more replies, then it queues up a
401 * {@link End2EndContext#DONE} message.
403 * @param message the message to which a reply should be sent
405 private void reply(PdpMessage message) {
406 PdpStatus status = replies.poll();
407 if (status == null) {
411 PdpResponseDetails response = new PdpResponseDetails();
412 response.setResponseTo(message.getRequestId());
413 status.setResponse(response);
415 toPap.add(toJson(status));
417 if (replies.isEmpty()) {
424 * Converts a message to JSON.
426 * @param status message to be converted
427 * @return JSON representation of the message
429 private String toJson(PdpStatus status) {
431 return coder.encode(status);
433 } catch (CoderException e) {
440 public String toString() {