2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.models.sim.dmaap.provider;
23 import java.util.LinkedHashMap;
25 import java.util.SortedMap;
26 import java.util.TreeMap;
28 import javax.ws.rs.core.Response;
30 import org.apache.commons.lang3.tuple.MutablePair;
31 import org.onap.policy.common.utils.coder.CoderException;
32 import org.onap.policy.common.utils.coder.StandardCoder;
33 import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Provider to simulate DMaaP.
40 * @author Liam Fallon (liam.fallon@est.tech)
42 public class DmaapSimProvider {
43 private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
45 // Time for a get to wait before checking of a message has come
46 private static final long DMAAP_SIM_WAIT_TIME = 50;
48 // recurring constants
49 private static final String WITH_TIMEOUT = " with timeout ";
51 // The map of topic messages
52 private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
54 // The map of topic messages
55 private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
56 new LinkedHashMap<>();
59 * Process a DMaaP message.
61 * @param topicName The topic name
62 * @param dmaapMessage the message to process
63 * @return a response to the message
65 public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
66 LOGGER.debug("Topic:" + topicName + ", Received DMaaP message: " + dmaapMessage);
68 synchronized (topicMessageMap) {
69 SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
70 if (messageMap == null) {
71 messageMap = new TreeMap<>();
72 topicMessageMap.put(topicName, messageMap);
73 LOGGER.debug("Topic:" + topicName + ", created topic message map");
76 int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
78 messageMap.put(nextKey, dmaapMessage);
79 LOGGER.debug("Topic:" + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
82 return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build();
86 * Wait for and return a DMaaP message.
88 * @param topicName The topic to wait on
89 * @param consumerGroup the consumer group that is waiting
90 * @param consumerId the consumer ID that is waiting
91 * @param timeout the length of time to wait for
92 * @return the DMaaP message or
94 public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
97 LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
98 + WITH_TIMEOUT + timeout);
100 MutablePair<Integer, String> consumerGroupPair = null;
102 synchronized (consumerGroupsMap) {
103 Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
104 if (consumerGroupMap == null) {
105 consumerGroupMap = new LinkedHashMap<>();
106 consumerGroupsMap.put(topicName, consumerGroupMap);
107 LOGGER.trace("Topic:" + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
110 consumerGroupPair = consumerGroupMap.get(consumerGroup);
111 if (consumerGroupPair == null) {
112 consumerGroupPair = new MutablePair<>(-1, consumerId);
113 consumerGroupMap.put(consumerGroup, consumerGroupPair);
114 LOGGER.trace("Topic:" + topicName + ", Created consumer group entry for consumer group " + consumerGroup
119 long timeOfTimeout = System.currentTimeMillis() + timeout;
123 Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
124 if (waitingMessages != null) {
125 LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
126 + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
127 return Response.status(Response.Status.OK).entity(waitingMessages).build();
131 Thread.sleep(DMAAP_SIM_WAIT_TIME);
132 } catch (InterruptedException ie) {
133 String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
134 + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
135 LOGGER.warn(errorMessage, ie);
136 throw new DmaapSimRuntimeException(errorMessage, ie);
139 while (timeOfTimeout > System.currentTimeMillis());
141 LOGGER.trace("Topic:" + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
142 + WITH_TIMEOUT + timeout);
143 return Response.status(Response.Status.REQUEST_TIMEOUT).build();
147 * Return any messages on this topic with a message number greater than the supplied message number.
149 * @param topicName the topic name to check
150 * @param consumerGroupPair the pair with the information on the last message retrieved
151 * @return the messages or null if there are none
153 private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
154 String foundMessageList = "[";
156 synchronized (topicMessageMap) {
157 SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
158 if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
162 boolean first = true;
163 for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
167 foundMessageList += ",";
170 foundMessageList += new StandardCoder().encode(dmaapMessage);
171 } catch (CoderException e) {
175 foundMessageList += ']';
177 LOGGER.debug("Topic:" + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft()
178 + " to " + messageMap.lastKey());
179 synchronized (consumerGroupsMap) {
180 consumerGroupPair.setLeft(messageMap.lastKey());
184 return (foundMessageList.length() < 3 ? null : foundMessageList);