2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.models.sim.dmaap.provider;
23 import java.util.LinkedHashMap;
25 import java.util.SortedMap;
26 import java.util.TreeMap;
27 import java.util.concurrent.TimeUnit;
29 import javax.ws.rs.core.Response;
31 import org.apache.commons.lang3.tuple.MutablePair;
32 import org.onap.policy.common.utils.coder.CoderException;
33 import org.onap.policy.common.utils.coder.StandardCoder;
34 import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * Provider to simulate DMaaP.
41 * @author Liam Fallon (liam.fallon@est.tech)
43 public class DmaapSimProvider {
44 private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
46 // Recurring string constants
47 private static final String TOPIC_TAG = "Topic:";
49 // Time for a get to wait before checking of a message has come
50 private static final long DMAAP_SIM_WAIT_TIME = 50;
52 // recurring constants
53 private static final String WITH_TIMEOUT = " with timeout ";
55 // The map of topic messages
56 private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
58 // The map of topic messages
59 private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
60 new LinkedHashMap<>();
63 * Process a DMaaP message.
65 * @param topicName The topic name
66 * @param dmaapMessage the message to process
67 * @return a response to the message
69 public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
70 LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage);
72 synchronized (topicMessageMap) {
73 SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
74 if (messageMap == null) {
75 messageMap = new TreeMap<>();
76 topicMessageMap.put(topicName, messageMap);
77 LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map");
80 int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
82 messageMap.put(nextKey, dmaapMessage);
83 LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
86 return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build();
90 * Wait for and return a DMaaP message.
92 * @param topicName The topic to wait on
93 * @param consumerGroup the consumer group that is waiting
94 * @param consumerId the consumer ID that is waiting
95 * @param timeout the length of time to wait for
96 * @return the DMaaP message or
98 public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
101 LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
102 + WITH_TIMEOUT + timeout);
104 MutablePair<Integer, String> consumerGroupPair = null;
106 synchronized (consumerGroupsMap) {
107 Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
108 if (consumerGroupMap == null) {
109 consumerGroupMap = new LinkedHashMap<>();
110 consumerGroupsMap.put(topicName, consumerGroupMap);
112 TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
115 consumerGroupPair = consumerGroupMap.get(consumerGroup);
116 if (consumerGroupPair == null) {
117 consumerGroupPair = new MutablePair<>(-1, consumerId);
118 consumerGroupMap.put(consumerGroup, consumerGroupPair);
119 LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
120 + consumerGroup + ":" + consumerId);
124 long timeOfTimeout = System.currentTimeMillis() + timeout;
127 Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
128 if (waitingMessages != null) {
129 LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
130 + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
131 return Response.status(Response.Status.OK).entity(waitingMessages).build();
135 TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME);
136 } catch (InterruptedException ie) {
137 String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
138 + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
139 LOGGER.warn(errorMessage, ie);
140 Thread.currentThread().interrupt();
141 throw new DmaapSimRuntimeException(errorMessage, ie);
144 while (timeOfTimeout > System.currentTimeMillis());
146 LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
147 + WITH_TIMEOUT + timeout);
148 return Response.status(Response.Status.REQUEST_TIMEOUT).build();
152 * Return any messages on this topic with a message number greater than the supplied message number.
154 * @param topicName the topic name to check
155 * @param consumerGroupPair the pair with the information on the last message retrieved
156 * @return the messages or null if there are none
158 private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
159 String foundMessageList = "[";
161 synchronized (topicMessageMap) {
162 SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
163 if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
167 boolean first = true;
168 for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
172 foundMessageList += ",";
175 foundMessageList += new StandardCoder().encode(dmaapMessage);
176 } catch (CoderException ce) {
177 String errorMessage = "Encoding error on message on DMaaP topic " + topicName;
178 LOGGER.warn(errorMessage, ce);
182 foundMessageList += ']';
184 LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft()
185 + " to " + messageMap.lastKey());
186 synchronized (consumerGroupsMap) {
187 consumerGroupPair.setLeft(messageMap.lastKey());
191 return (foundMessageList.length() < 3 ? null : foundMessageList);