Merge "Supports new aai changes."
[policy/models.git] / models-sim / models-sim-dmaap / src / main / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.models.sim.dmaap.provider;
22
23 import java.util.LinkedHashMap;
24 import java.util.Map;
25 import java.util.SortedMap;
26 import java.util.TreeMap;
27 import java.util.concurrent.TimeUnit;
28
29 import javax.ws.rs.core.Response;
30
31 import org.apache.commons.lang3.tuple.MutablePair;
32 import org.onap.policy.common.utils.coder.CoderException;
33 import org.onap.policy.common.utils.coder.StandardCoder;
34 import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Provider to simulate DMaaP.
40  *
41  * @author Liam Fallon (liam.fallon@est.tech)
42  */
43 public class DmaapSimProvider {
44     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
45
46     // Recurring string constants
47     private static final String TOPIC_TAG = "Topic:";
48
49     // Time for a get to wait before checking of a message has come
50     private static final long DMAAP_SIM_WAIT_TIME = 50;
51
52     // recurring constants
53     private static final String WITH_TIMEOUT = " with timeout ";
54
55     // The map of topic messages
56     private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
57
58     // The map of topic messages
59     private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
60             new LinkedHashMap<>();
61
62     /**
63      * Process a DMaaP message.
64      *
65      * @param topicName The topic name
66      * @param dmaapMessage the message to process
67      * @return a response to the message
68      */
69     public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
70         LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage);
71
72         synchronized (topicMessageMap) {
73             SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
74             if (messageMap == null) {
75                 messageMap = new TreeMap<>();
76                 topicMessageMap.put(topicName, messageMap);
77                 LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map");
78             }
79
80             int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
81
82             messageMap.put(nextKey, dmaapMessage);
83             LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
84         }
85
86         return Response.status(Response.Status.OK).entity("{\n    \"serverTimeMs\": 0,\n    \"count\": 1\n}").build();
87     }
88
89     /**
90      * Wait for and return a DMaaP message.
91      *
92      * @param topicName The topic to wait on
93      * @param consumerGroup the consumer group that is waiting
94      * @param consumerId the consumer ID that is waiting
95      * @param timeout the length of time to wait for
96      * @return the DMaaP message or
97      */
98     public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
99             final int timeout) {
100
101         LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
102                 + WITH_TIMEOUT + timeout);
103
104         MutablePair<Integer, String> consumerGroupPair = null;
105
106         synchronized (consumerGroupsMap) {
107             Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
108             if (consumerGroupMap == null) {
109                 consumerGroupMap = new LinkedHashMap<>();
110                 consumerGroupsMap.put(topicName, consumerGroupMap);
111                 LOGGER.trace(
112                         TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
113             }
114
115             consumerGroupPair = consumerGroupMap.get(consumerGroup);
116             if (consumerGroupPair == null) {
117                 consumerGroupPair = new MutablePair<>(-1, consumerId);
118                 consumerGroupMap.put(consumerGroup, consumerGroupPair);
119                 LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
120                         + consumerGroup + ":" + consumerId);
121             }
122         }
123
124         long timeOfTimeout = System.currentTimeMillis() + timeout;
125
126         do {
127             Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
128             if (waitingMessages != null) {
129                 LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
130                         + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
131                 return Response.status(Response.Status.OK).entity(waitingMessages).build();
132             }
133
134             try {
135                 TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME);
136             } catch (InterruptedException ie) {
137                 String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
138                         + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
139                 LOGGER.warn(errorMessage, ie);
140                 Thread.currentThread().interrupt();
141                 throw new DmaapSimRuntimeException(errorMessage, ie);
142             }
143         }
144         while (timeOfTimeout > System.currentTimeMillis());
145
146         LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
147                 + WITH_TIMEOUT + timeout);
148         return Response.status(Response.Status.REQUEST_TIMEOUT).build();
149     }
150
151     /**
152      * Return any messages on this topic with a message number greater than the supplied message number.
153      *
154      * @param topicName the topic name to check
155      * @param consumerGroupPair the pair with the information on the last message retrieved
156      * @return the messages or null if there are none
157      */
158     private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
159         String foundMessageList = "[";
160
161         synchronized (topicMessageMap) {
162             SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
163             if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
164                 return null;
165             }
166
167             boolean first = true;
168             for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
169                 if (first) {
170                     first = false;
171                 } else {
172                     foundMessageList += ",";
173                 }
174                 try {
175                     foundMessageList += new StandardCoder().encode(dmaapMessage);
176                 } catch (CoderException ce) {
177                     String errorMessage = "Encoding error on message on DMaaP topic " + topicName;
178                     LOGGER.warn(errorMessage, ce);
179                     return null;
180                 }
181             }
182             foundMessageList += ']';
183
184             LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from  " + consumerGroupPair.getLeft()
185                     + " to " + messageMap.lastKey());
186             synchronized (consumerGroupsMap) {
187                 consumerGroupPair.setLeft(messageMap.lastKey());
188             }
189         }
190
191         return (foundMessageList.length() < 3 ? null : foundMessageList);
192     }
193 }