Add DMaaP simulator for CSIT testing
[policy/models.git] / models-sim / models-sim-dmaap / src / main / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.models.sim.dmaap.provider;
22
23 import java.util.LinkedHashMap;
24 import java.util.Map;
25 import java.util.SortedMap;
26 import java.util.TreeMap;
27
28 import javax.ws.rs.core.Response;
29
30 import org.apache.commons.lang3.tuple.MutablePair;
31 import org.onap.policy.common.utils.coder.CoderException;
32 import org.onap.policy.common.utils.coder.StandardCoder;
33 import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Provider to simulate DMaaP.
39  *
40  * @author Liam Fallon (liam.fallon@est.tech)
41  */
42 public class DmaapSimProvider {
43     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
44
45     // Time for a get to wait before checking of a message has come
46     private static final long DMAAP_SIM_WAIT_TIME = 50;
47
48     // recurring constants
49     private static final String WITH_TIMEOUT = " with timeout ";
50
51     // The map of topic messages
52     private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
53
54     // The map of topic messages
55     private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
56             new LinkedHashMap<>();
57
58     /**
59      * Process a DMaaP message.
60      *
61      * @param topicName The topic name
62      * @param dmaapMessage the message to process
63      * @return a response to the message
64      */
65     public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
66         LOGGER.debug("Topic:" + topicName + ", Received DMaaP message: " + dmaapMessage);
67
68         synchronized (topicMessageMap) {
69             SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
70             if (messageMap == null) {
71                 messageMap = new TreeMap<>();
72                 topicMessageMap.put(topicName, messageMap);
73                 LOGGER.debug("Topic:" + topicName + ", created topic message map");
74             }
75
76             int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
77
78             messageMap.put(nextKey, dmaapMessage);
79             LOGGER.debug("Topic:" + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
80         }
81
82         return Response.status(Response.Status.OK).entity("{\n    \"serverTimeMs\": 0,\n    \"count\": 1\n}").build();
83     }
84
85     /**
86      * Wait for and return a DMaaP message.
87      *
88      * @param topicName The topic to wait on
89      * @param consumerGroup the consumer group that is waiting
90      * @param consumerId the consumer ID that is waiting
91      * @param timeout the length of time to wait for
92      * @return the DMaaP message or
93      */
94     public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
95             final int timeout) {
96
97         LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
98                 + WITH_TIMEOUT + timeout);
99
100         MutablePair<Integer, String> consumerGroupPair = null;
101
102         synchronized (consumerGroupsMap) {
103             Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
104             if (consumerGroupMap == null) {
105                 consumerGroupMap = new LinkedHashMap<>();
106                 consumerGroupsMap.put(topicName, consumerGroupMap);
107                 LOGGER.trace("Topic:" + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
108             }
109
110             consumerGroupPair = consumerGroupMap.get(consumerGroup);
111             if (consumerGroupPair == null) {
112                 consumerGroupPair = new MutablePair<>(-1, consumerId);
113                 consumerGroupMap.put(consumerGroup, consumerGroupPair);
114                 LOGGER.trace("Topic:" + topicName + ", Created consumer group entry for consumer group " + consumerGroup
115                         + ":" + consumerId);
116             }
117         }
118
119         long timeOfTimeout = System.currentTimeMillis() + timeout;
120
121         do {
122
123             Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
124             if (waitingMessages != null) {
125                 LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
126                         + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
127                 return Response.status(Response.Status.OK).entity(waitingMessages).build();
128             }
129
130             try {
131                 Thread.sleep(DMAAP_SIM_WAIT_TIME);
132             } catch (InterruptedException ie) {
133                 String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
134                         + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
135                 LOGGER.warn(errorMessage, ie);
136                 throw new DmaapSimRuntimeException(errorMessage, ie);
137             }
138         }
139         while (timeOfTimeout > System.currentTimeMillis());
140
141         LOGGER.trace("Topic:" + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
142                 + WITH_TIMEOUT + timeout);
143         return Response.status(Response.Status.REQUEST_TIMEOUT).build();
144     }
145
146     /**
147      * Return any messages on this topic with a message number greater than the supplied message number.
148      *
149      * @param topicName the topic name to check
150      * @param consumerGroupPair the pair with the information on the last message retrieved
151      * @return the messages or null if there are none
152      */
153     private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
154         String foundMessageList = "[";
155
156         synchronized (topicMessageMap) {
157             SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
158             if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
159                 return null;
160             }
161
162             boolean first = true;
163             for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
164                 if (first) {
165                     first = false;
166                 } else {
167                     foundMessageList += ",";
168                 }
169                 try {
170                     foundMessageList += new StandardCoder().encode(dmaapMessage);
171                 } catch (CoderException e) {
172                     e.printStackTrace();
173                 }
174             }
175             foundMessageList += ']';
176
177             LOGGER.debug("Topic:" + topicName + ", returning DMaaP messages from  " + consumerGroupPair.getLeft()
178                     + " to " + messageMap.lastKey());
179             synchronized (consumerGroupsMap) {
180                 consumerGroupPair.setLeft(messageMap.lastKey());
181             }
182         }
183
184         return (foundMessageList.length() < 3 ? null : foundMessageList);
185     }
186 }