X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=models-sim%2Fmodels-sim-dmaap%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fmodels%2Fsim%2Fdmaap%2Fprovider%2FDmaapSimProvider.java;h=d11d1b397d7fef67c199bd73697df67898dac595;hb=aa148d9b5bba6ad23736e939a6d0ec917e761e1e;hp=9de29cdac22049e51d4c85a419bcd59b11952d8d;hpb=5af913104ec412086deab4d599359751246e4ba3;p=policy%2Fmodels.git diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java index 9de29cdac..d11d1b397 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,18 +21,20 @@ package org.onap.policy.models.sim.dmaap.provider; +import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.tuple.MutablePair; -import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; -import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; +import javax.ws.rs.core.Response.Status; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,50 +43,70 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@est.tech) */ -public class DmaapSimProvider { +public class DmaapSimProvider extends ServiceManagerContainer { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class); - // Recurring string constants - private static final String TOPIC_TAG = "Topic:"; + @Getter + @Setter + private static DmaapSimProvider instance; - // Time for a get to wait before checking of a message has come - private static final long DMAAP_SIM_WAIT_TIME = 50; + /** + * Maps a topic name to its data. + */ + private final Map topic2data = new ConcurrentHashMap<>(); - // recurring constants - private static final String WITH_TIMEOUT = " with timeout "; + /** + * Thread used to remove idle consumers from the topics. + */ + private ScheduledExecutorService timerPool; - // The map of topic messages - private static final Map> topicMessageMap = new LinkedHashMap<>(); - // The map of topic messages - private static final Map>> consumerGroupsMap = - new LinkedHashMap<>(); + /** + * Constructs the object. + * + * @param params parameters + */ + public DmaapSimProvider(DmaapSimParameterGroup params) { + addAction("Topic Sweeper", () -> { + timerPool = makeTimerPool(); + timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(), + TimeUnit.SECONDS); + }, () -> timerPool.shutdown()); + } /** * Process a DMaaP message. * - * @param topicName The topic name + * @param topicName the topic name * @param dmaapMessage the message to process * @return a response to the message */ + @SuppressWarnings("unchecked") public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) { - LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage); - - synchronized (topicMessageMap) { - SortedMap messageMap = topicMessageMap.get(topicName); - if (messageMap == null) { - messageMap = new TreeMap<>(); - topicMessageMap.put(topicName, messageMap); - LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map"); - } + LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage); - int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1); + List lst; - messageMap.put(nextKey, dmaapMessage); - LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage); + if (dmaapMessage instanceof List) { + lst = (List) dmaapMessage; + } else { + lst = Collections.singletonList(dmaapMessage); } - return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build(); + TopicData topic = topic2data.get(topicName); + + /* + * Write all messages and return the count. If the topic doesn't exist yet, then + * there are no subscribers to receive the messages, thus treat it as if all + * messages were published. + */ + int nmessages = (topic != null ? topic.write(lst) : lst.size()); + + Map map = new LinkedHashMap<>(); + map.put("serverTimeMs", 0); + map.put("count", nmessages); + + return Response.status(Response.Status.OK).entity(map).build(); } /** @@ -92,102 +115,66 @@ public class DmaapSimProvider { * @param topicName The topic to wait on * @param consumerGroup the consumer group that is waiting * @param consumerId the consumer ID that is waiting - * @param timeout the length of time to wait for + * @param limit the maximum number of messages to get + * @param timeoutMs the length of time to wait for * @return the DMaaP message or */ public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId, - final int timeout) { + final int limit, final long timeoutMs) { - LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout); + LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup, + consumerId, limit, timeoutMs); - MutablePair consumerGroupPair = null; - - synchronized (consumerGroupsMap) { - Map> consumerGroupMap = consumerGroupsMap.get(topicName); - if (consumerGroupMap == null) { - consumerGroupMap = new LinkedHashMap<>(); - consumerGroupsMap.put(topicName, consumerGroupMap); - LOGGER.trace( - TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup); - } - - consumerGroupPair = consumerGroupMap.get(consumerGroup); - if (consumerGroupPair == null) { - consumerGroupPair = new MutablePair<>(-1, consumerId); - consumerGroupMap.put(consumerGroup, consumerGroupPair); - LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group " - + consumerGroup + ":" + consumerId); - } - } + try { + List lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit, + timeoutMs); - long timeOfTimeout = System.currentTimeMillis() + timeout; + if (lst.isEmpty() && timeoutMs > 0) { + LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId); + return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build(); - do { - Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair); - if (waitingMessages != null) { - LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages); - return Response.status(Response.Status.OK).entity(waitingMessages).build(); + } else { + LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(), + consumerId); + return Response.status(Status.OK).entity(lst).build(); } - try { - TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME); - } catch (InterruptedException ie) { - String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID " - + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout; - LOGGER.warn(errorMessage, ie); - Thread.currentThread().interrupt(); - throw new DmaapSimRuntimeException(errorMessage, ie); - } + } catch (InterruptedException e) { + LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup, + consumerId, e); + Thread.currentThread().interrupt(); + return Response.status(Status.GONE).entity(Collections.emptyList()).build(); } - while (timeOfTimeout > System.currentTimeMillis()); - - LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout); - return Response.status(Response.Status.REQUEST_TIMEOUT).build(); } /** - * Return any messages on this topic with a message number greater than the supplied message number. - * - * @param topicName the topic name to check - * @param consumerGroupPair the pair with the information on the last message retrieved - * @return the messages or null if there are none + * Task to remove idle consumers from each topic. */ - private Object getWaitingMessages(final String topicName, final MutablePair consumerGroupPair) { - String foundMessageList = "["; - - synchronized (topicMessageMap) { - SortedMap messageMap = topicMessageMap.get(topicName); - if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) { - return null; - } + private class SweeperTask implements Runnable { + @Override + public void run() { + topic2data.values().forEach(TopicData::removeIdleConsumers); + } + } - boolean first = true; - for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) { - if (first) { - first = false; - } else { - foundMessageList += ","; - } - try { - foundMessageList += new StandardCoder().encode(dmaapMessage); - } catch (CoderException ce) { - String errorMessage = "Encoding error on message on DMaaP topic " + topicName; - LOGGER.warn(errorMessage, ce); - return null; - } - } - foundMessageList += ']'; + // the following methods may be overridden by junit tests - LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft() - + " to " + messageMap.lastKey()); - synchronized (consumerGroupsMap) { - consumerGroupPair.setLeft(messageMap.lastKey()); - } - } + /** + * Makes a new timer pool. + * + * @return a new timer pool + */ + protected ScheduledExecutorService makeTimerPool() { + return Executors.newScheduledThreadPool(1); + } - return (foundMessageList.length() < 3 ? null : foundMessageList); + /** + * Makes a new topic. + * + * @param topicName topic name + * @return a new topic + */ + protected TopicData makeTopicData(String topicName) { + return new TopicData(topicName); } }