/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.models.sim.dmaap.provider;
+import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+import javax.ws.rs.core.Response.Status;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @author Liam Fallon (liam.fallon@est.tech)
*/
-public class DmaapSimProvider {
+public class DmaapSimProvider extends ServiceManagerContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
- // Recurring string constants
- private static final String TOPIC_TAG = "Topic:";
+ @Getter
+ @Setter
+ private static DmaapSimProvider instance;
- // Time for a get to wait before checking of a message has come
- private static final long DMAAP_SIM_WAIT_TIME = 50;
+ /**
+ * Maps a topic name to its data.
+ */
+ private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>();
- // recurring constants
- private static final String WITH_TIMEOUT = " with timeout ";
+ /**
+ * Thread used to remove idle consumers from the topics.
+ */
+ private ScheduledExecutorService timerPool;
- // The map of topic messages
- private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
- // The map of topic messages
- private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
- new LinkedHashMap<>();
+ /**
+ * Constructs the object.
+ *
+ * @param params parameters
+ */
+ public DmaapSimProvider(DmaapSimParameterGroup params) {
+ addAction("Topic Sweeper", () -> {
+ timerPool = makeTimerPool();
+ timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(),
+ TimeUnit.SECONDS);
+ }, () -> timerPool.shutdown());
+ }
/**
* Process a DMaaP message.
*
- * @param topicName The topic name
+ * @param topicName the topic name
* @param dmaapMessage the message to process
* @return a response to the message
*/
+ @SuppressWarnings("unchecked")
public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage);
-
- synchronized (topicMessageMap) {
- SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
- if (messageMap == null) {
- messageMap = new TreeMap<>();
- topicMessageMap.put(topicName, messageMap);
- LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map");
- }
+ LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage);
- int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
+ List<Object> lst;
- messageMap.put(nextKey, dmaapMessage);
- LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
+ if (dmaapMessage instanceof List) {
+ lst = (List<Object>) dmaapMessage;
+ } else {
+ lst = Collections.singletonList(dmaapMessage);
}
- return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build();
+ TopicData topic = topic2data.get(topicName);
+
+ /*
+ * Write all messages and return the count. If the topic doesn't exist yet, then
+ * there are no subscribers to receive the messages, thus treat it as if all
+ * messages were published.
+ */
+ int nmessages = (topic != null ? topic.write(lst) : lst.size());
+
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("serverTimeMs", 0);
+ map.put("count", nmessages);
+
+ return Response.status(Response.Status.OK).entity(map).build();
}
/**
* @param topicName The topic to wait on
* @param consumerGroup the consumer group that is waiting
* @param consumerId the consumer ID that is waiting
- * @param timeout the length of time to wait for
+ * @param limit the maximum number of messages to get
+ * @param timeoutMs the length of time to wait for
* @return the DMaaP message or
*/
public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
- final int timeout) {
+ final int limit, final long timeoutMs) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout);
+ LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup,
+ consumerId, limit, timeoutMs);
- MutablePair<Integer, String> consumerGroupPair = null;
-
- synchronized (consumerGroupsMap) {
- Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
- if (consumerGroupMap == null) {
- consumerGroupMap = new LinkedHashMap<>();
- consumerGroupsMap.put(topicName, consumerGroupMap);
- LOGGER.trace(
- TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
- }
-
- consumerGroupPair = consumerGroupMap.get(consumerGroup);
- if (consumerGroupPair == null) {
- consumerGroupPair = new MutablePair<>(-1, consumerId);
- consumerGroupMap.put(consumerGroup, consumerGroupPair);
- LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
- + consumerGroup + ":" + consumerId);
- }
- }
+ try {
+ List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
+ timeoutMs);
- long timeOfTimeout = System.currentTimeMillis() + timeout;
+ if (lst.isEmpty() && timeoutMs > 0) {
+ LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId);
+ return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build();
- do {
- Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
- if (waitingMessages != null) {
- LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
- return Response.status(Response.Status.OK).entity(waitingMessages).build();
+ } else {
+ LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(),
+ consumerId);
+ return Response.status(Status.OK).entity(lst).build();
}
- try {
- TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME);
- } catch (InterruptedException ie) {
- String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
- + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
- LOGGER.warn(errorMessage, ie);
- Thread.currentThread().interrupt();
- throw new DmaapSimRuntimeException(errorMessage, ie);
- }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup,
+ consumerId, e);
+ Thread.currentThread().interrupt();
+ return Response.status(Status.GONE).entity(Collections.emptyList()).build();
}
- while (timeOfTimeout > System.currentTimeMillis());
-
- LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
- + WITH_TIMEOUT + timeout);
- return Response.status(Response.Status.REQUEST_TIMEOUT).build();
}
/**
- * Return any messages on this topic with a message number greater than the supplied message number.
- *
- * @param topicName the topic name to check
- * @param consumerGroupPair the pair with the information on the last message retrieved
- * @return the messages or null if there are none
+ * Task to remove idle consumers from each topic.
*/
- private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
- String foundMessageList = "[";
-
- synchronized (topicMessageMap) {
- SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
- if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
- return null;
- }
+ private class SweeperTask implements Runnable {
+ @Override
+ public void run() {
+ topic2data.values().forEach(TopicData::removeIdleConsumers);
+ }
+ }
- boolean first = true;
- for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
- if (first) {
- first = false;
- } else {
- foundMessageList += ",";
- }
- try {
- foundMessageList += new StandardCoder().encode(dmaapMessage);
- } catch (CoderException ce) {
- String errorMessage = "Encoding error on message on DMaaP topic " + topicName;
- LOGGER.warn(errorMessage, ce);
- return null;
- }
- }
- foundMessageList += ']';
+ // the following methods may be overridden by junit tests
- LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft()
- + " to " + messageMap.lastKey());
- synchronized (consumerGroupsMap) {
- consumerGroupPair.setLeft(messageMap.lastKey());
- }
- }
+ /**
+ * Makes a new timer pool.
+ *
+ * @return a new timer pool
+ */
+ protected ScheduledExecutorService makeTimerPool() {
+ return Executors.newScheduledThreadPool(1);
+ }
- return (foundMessageList.length() < 3 ? null : foundMessageList);
+ /**
+ * Makes a new topic.
+ *
+ * @param topicName topic name
+ * @return a new topic
+ */
+ protected TopicData makeTopicData(String topicName) {
+ return new TopicData(topicName);
}
}