Flesh out DMaaP simulator
[policy/models.git] / models-sim / models-sim-dmaap / src / main / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProvider.java
index 9de29cd..d11d1b3 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.models.sim.dmaap.provider;
 
+import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+import javax.ws.rs.core.Response.Status;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,50 +43,70 @@ import org.slf4j.LoggerFactory;
  *
  * @author Liam Fallon (liam.fallon@est.tech)
  */
-public class DmaapSimProvider {
+public class DmaapSimProvider extends ServiceManagerContainer {
     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
 
-    // Recurring string constants
-    private static final String TOPIC_TAG = "Topic:";
+    @Getter
+    @Setter
+    private static DmaapSimProvider instance;
 
-    // Time for a get to wait before checking of a message has come
-    private static final long DMAAP_SIM_WAIT_TIME = 50;
+    /**
+     * Maps a topic name to its data.
+     */
+    private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>();
 
-    // recurring constants
-    private static final String WITH_TIMEOUT = " with timeout ";
+    /**
+     * Thread used to remove idle consumers from the topics.
+     */
+    private ScheduledExecutorService timerPool;
 
-    // The map of topic messages
-    private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
 
-    // The map of topic messages
-    private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
-            new LinkedHashMap<>();
+    /**
+     * Constructs the object.
+     *
+     * @param params parameters
+     */
+    public DmaapSimProvider(DmaapSimParameterGroup params) {
+        addAction("Topic Sweeper", () -> {
+            timerPool = makeTimerPool();
+            timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(),
+                            TimeUnit.SECONDS);
+        }, () -> timerPool.shutdown());
+    }
 
     /**
      * Process a DMaaP message.
      *
-     * @param topicName The topic name
+     * @param topicName the topic name
      * @param dmaapMessage the message to process
      * @return a response to the message
      */
+    @SuppressWarnings("unchecked")
     public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
-        LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage);
-
-        synchronized (topicMessageMap) {
-            SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
-            if (messageMap == null) {
-                messageMap = new TreeMap<>();
-                topicMessageMap.put(topicName, messageMap);
-                LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map");
-            }
+        LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage);
 
-            int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
+        List<Object> lst;
 
-            messageMap.put(nextKey, dmaapMessage);
-            LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
+        if (dmaapMessage instanceof List) {
+            lst = (List<Object>) dmaapMessage;
+        } else {
+            lst = Collections.singletonList(dmaapMessage);
         }
 
-        return Response.status(Response.Status.OK).entity("{\n    \"serverTimeMs\": 0,\n    \"count\": 1\n}").build();
+        TopicData topic = topic2data.get(topicName);
+
+        /*
+         * Write all messages and return the count. If the topic doesn't exist yet, then
+         * there are no subscribers to receive the messages, thus treat it as if all
+         * messages were published.
+         */
+        int nmessages = (topic != null ? topic.write(lst) : lst.size());
+
+        Map<String, Object> map = new LinkedHashMap<>();
+        map.put("serverTimeMs", 0);
+        map.put("count", nmessages);
+
+        return Response.status(Response.Status.OK).entity(map).build();
     }
 
     /**
@@ -92,102 +115,66 @@ public class DmaapSimProvider {
      * @param topicName The topic to wait on
      * @param consumerGroup the consumer group that is waiting
      * @param consumerId the consumer ID that is waiting
-     * @param timeout the length of time to wait for
+     * @param limit the maximum number of messages to get
+     * @param timeoutMs the length of time to wait for
      * @return the DMaaP message or
      */
     public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
-            final int timeout) {
+                    final int limit, final long timeoutMs) {
 
-        LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
-                + WITH_TIMEOUT + timeout);
+        LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup,
+                        consumerId, limit, timeoutMs);
 
-        MutablePair<Integer, String> consumerGroupPair = null;
-
-        synchronized (consumerGroupsMap) {
-            Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
-            if (consumerGroupMap == null) {
-                consumerGroupMap = new LinkedHashMap<>();
-                consumerGroupsMap.put(topicName, consumerGroupMap);
-                LOGGER.trace(
-                        TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
-            }
-
-            consumerGroupPair = consumerGroupMap.get(consumerGroup);
-            if (consumerGroupPair == null) {
-                consumerGroupPair = new MutablePair<>(-1, consumerId);
-                consumerGroupMap.put(consumerGroup, consumerGroupPair);
-                LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
-                        + consumerGroup + ":" + consumerId);
-            }
-        }
+        try {
+            List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
+                            timeoutMs);
 
-        long timeOfTimeout = System.currentTimeMillis() + timeout;
+            if (lst.isEmpty() && timeoutMs > 0) {
+                LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId);
+                return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build();
 
-        do {
-            Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
-            if (waitingMessages != null) {
-                LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
-                        + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
-                return Response.status(Response.Status.OK).entity(waitingMessages).build();
+            } else {
+                LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(),
+                                consumerId);
+                return Response.status(Status.OK).entity(lst).build();
             }
 
-            try {
-                TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME);
-            } catch (InterruptedException ie) {
-                String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
-                        + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
-                LOGGER.warn(errorMessage, ie);
-                Thread.currentThread().interrupt();
-                throw new DmaapSimRuntimeException(errorMessage, ie);
-            }
+        } catch (InterruptedException e) {
+            LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup,
+                            consumerId, e);
+            Thread.currentThread().interrupt();
+            return Response.status(Status.GONE).entity(Collections.emptyList()).build();
         }
-        while (timeOfTimeout > System.currentTimeMillis());
-
-        LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
-                + WITH_TIMEOUT + timeout);
-        return Response.status(Response.Status.REQUEST_TIMEOUT).build();
     }
 
     /**
-     * Return any messages on this topic with a message number greater than the supplied message number.
-     *
-     * @param topicName the topic name to check
-     * @param consumerGroupPair the pair with the information on the last message retrieved
-     * @return the messages or null if there are none
+     * Task to remove idle consumers from each topic.
      */
-    private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
-        String foundMessageList = "[";
-
-        synchronized (topicMessageMap) {
-            SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
-            if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
-                return null;
-            }
+    private class SweeperTask implements Runnable {
+        @Override
+        public void run() {
+            topic2data.values().forEach(TopicData::removeIdleConsumers);
+        }
+    }
 
-            boolean first = true;
-            for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
-                if (first) {
-                    first = false;
-                } else {
-                    foundMessageList += ",";
-                }
-                try {
-                    foundMessageList += new StandardCoder().encode(dmaapMessage);
-                } catch (CoderException ce) {
-                    String errorMessage = "Encoding error on message on DMaaP topic " + topicName;
-                    LOGGER.warn(errorMessage, ce);
-                    return null;
-                }
-            }
-            foundMessageList += ']';
+    // the following methods may be overridden by junit tests
 
-            LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from  " + consumerGroupPair.getLeft()
-                    + " to " + messageMap.lastKey());
-            synchronized (consumerGroupsMap) {
-                consumerGroupPair.setLeft(messageMap.lastKey());
-            }
-        }
+    /**
+     * Makes a new timer pool.
+     *
+     * @return a new timer pool
+     */
+    protected ScheduledExecutorService makeTimerPool() {
+        return Executors.newScheduledThreadPool(1);
+    }
 
-        return (foundMessageList.length() < 3 ? null : foundMessageList);
+    /**
+     * Makes a new topic.
+     *
+     * @param topicName topic name
+     * @return a new topic
+     */
+    protected TopicData makeTopicData(String topicName) {
+        return new TopicData(topicName);
     }
 }