X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=models-sim%2Fmodels-sim-dmaap%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fmodels%2Fsim%2Fdmaap%2Fprovider%2FDmaapSimProvider.java;h=afbe10f5145efcfe4aa2e36e9acc920cd9de6ac9;hb=71be21fd5b9b52c613bb855f00a79a51e81906dd;hp=42a653d6f21831cf9af0f269263139a18a657cba;hpb=900920306a0be309f389880325558bb96ff76356;p=policy%2Fmodels.git diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java index 42a653d6f..afbe10f51 100644 --- a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java +++ b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java @@ -1,6 +1,8 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2019, 2023 Nordix Foundation. + * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,17 +22,20 @@ package org.onap.policy.models.sim.dmaap.provider; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.Status; +import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - -import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.tuple.MutablePair; -import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; -import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,47 +44,70 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@est.tech) */ -public class DmaapSimProvider { +public class DmaapSimProvider extends ServiceManagerContainer { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class); - // Time for a get to wait before checking of a message has come - private static final long DMAAP_SIM_WAIT_TIME = 50; + @Getter + @Setter + private static DmaapSimProvider instance; - // recurring constants - private static final String WITH_TIMEOUT = " with timeout "; + /** + * Maps a topic name to its data. + */ + private final Map topic2data = new ConcurrentHashMap<>(); + + /** + * Thread used to remove idle consumers from the topics. + */ + private ScheduledExecutorService timerPool; - // The map of topic messages - private static final Map> topicMessageMap = new LinkedHashMap<>(); - // The map of topic messages - private static final Map>> consumerGroupsMap = - new LinkedHashMap<>(); + /** + * Constructs the object. + * + * @param params parameters + */ + public DmaapSimProvider(DmaapSimParameterGroup params) { + addAction("Topic Sweeper", () -> { + timerPool = makeTimerPool(); + timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(), + TimeUnit.SECONDS); + }, () -> timerPool.shutdown()); + } /** * Process a DMaaP message. * - * @param topicName The topic name + * @param topicName the topic name * @param dmaapMessage the message to process * @return a response to the message */ + @SuppressWarnings("unchecked") public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) { - LOGGER.debug("Topic:" + topicName + ", Received DMaaP message: " + dmaapMessage); - - synchronized (topicMessageMap) { - SortedMap messageMap = topicMessageMap.get(topicName); - if (messageMap == null) { - messageMap = new TreeMap<>(); - topicMessageMap.put(topicName, messageMap); - LOGGER.debug("Topic:" + topicName + ", created topic message map"); - } + LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage); - int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1); + List lst; - messageMap.put(nextKey, dmaapMessage); - LOGGER.debug("Topic:" + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage); + if (dmaapMessage instanceof List) { + lst = (List) dmaapMessage; + } else { + lst = Collections.singletonList(dmaapMessage); } - return Response.status(Response.Status.OK).entity("{\n \"serverTimeMs\": 0,\n \"count\": 1\n}").build(); + TopicData topic = topic2data.get(topicName); + + /* + * Write all messages and return the count. If the topic doesn't exist yet, then + * there are no subscribers to receive the messages, thus treat it as if all + * messages were published. + */ + int nmessages = (topic != null ? topic.write(lst) : lst.size()); + + Map map = new LinkedHashMap<>(); + map.put("serverTimeMs", 0); + map.put("count", nmessages); + + return Response.status(Response.Status.OK).entity(map).build(); } /** @@ -88,99 +116,74 @@ public class DmaapSimProvider { * @param topicName The topic to wait on * @param consumerGroup the consumer group that is waiting * @param consumerId the consumer ID that is waiting - * @param timeout the length of time to wait for + * @param limit the maximum number of messages to get + * @param timeoutMs the length of time to wait for * @return the DMaaP message or */ public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId, - final int timeout) { - - LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout); - - MutablePair consumerGroupPair = null; - - synchronized (consumerGroupsMap) { - Map> consumerGroupMap = consumerGroupsMap.get(topicName); - if (consumerGroupMap == null) { - consumerGroupMap = new LinkedHashMap<>(); - consumerGroupsMap.put(topicName, consumerGroupMap); - LOGGER.trace("Topic:" + topicName + ", Created consumer map entry for consumer group " + consumerGroup); - } - - consumerGroupPair = consumerGroupMap.get(consumerGroup); - if (consumerGroupPair == null) { - consumerGroupPair = new MutablePair<>(-1, consumerId); - consumerGroupMap.put(consumerGroup, consumerGroupPair); - LOGGER.trace("Topic:" + topicName + ", Created consumer group entry for consumer group " + consumerGroup - + ":" + consumerId); - } - } + final int limit, final long timeoutMs) { - long timeOfTimeout = System.currentTimeMillis() + timeout; - - do { - - Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair); - if (waitingMessages != null) { - LOGGER.debug("Topic:" + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages); - return Response.status(Response.Status.OK).entity(waitingMessages).build(); - } - - try { - Thread.sleep(DMAAP_SIM_WAIT_TIME); - } catch (InterruptedException ie) { - String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID " - + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout; - LOGGER.warn(errorMessage, ie); - throw new DmaapSimRuntimeException(errorMessage, ie); - } - } - while (timeOfTimeout > System.currentTimeMillis()); + LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup, + consumerId, limit, timeoutMs); + + try { + List lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit, + timeoutMs); - LOGGER.trace("Topic:" + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId - + WITH_TIMEOUT + timeout); - return Response.status(Response.Status.REQUEST_TIMEOUT).build(); + LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, lst.size(), consumerGroup, + consumerId); + return Response.status(Status.OK).entity(lst).build(); + + } catch (InterruptedException e) { + LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup, + consumerId, e); + Thread.currentThread().interrupt(); + return Response.status(Status.GONE).entity(Collections.emptyList()).build(); + } } /** - * Return any messages on this topic with a message number greater than the supplied message number. + * Returns the list of default topics. * - * @param topicName the topic name to check - * @param consumerGroupPair the pair with the information on the last message retrieved - * @return the messages or null if there are none + * @return the topic list */ - private Object getWaitingMessages(final String topicName, final MutablePair consumerGroupPair) { - String foundMessageList = "["; - - synchronized (topicMessageMap) { - SortedMap messageMap = topicMessageMap.get(topicName); - if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) { - return null; - } - - boolean first = true; - for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) { - if (first) { - first = false; - } else { - foundMessageList += ","; - } - try { - foundMessageList += new StandardCoder().encode(dmaapMessage); - } catch (CoderException e) { - e.printStackTrace(); - } - } - foundMessageList += ']'; - - LOGGER.debug("Topic:" + topicName + ", returning DMaaP messages from " + consumerGroupPair.getLeft() - + " to " + messageMap.lastKey()); - synchronized (consumerGroupsMap) { - consumerGroupPair.setLeft(messageMap.lastKey()); - } + public Response processDmaapTopicsGet() { + + LOGGER.debug("Request for listing DMaaP topics"); + var response = new DmaapGetTopicResponse(); + response.setTopics(List.of("POLICY-PDP-PAP", "POLICY-NOTIFICATION", "unauthenticated.DCAE_CL_OUTPUT", + "POLICY-CL-MGT")); + return Response.status(Status.OK).entity(response).build(); + } + + /** + * Task to remove idle consumers from each topic. + */ + private class SweeperTask implements Runnable { + @Override + public void run() { + topic2data.values().forEach(TopicData::removeIdleConsumers); } + } + + // the following methods may be overridden by junit tests - return (foundMessageList.length() < 3 ? null : foundMessageList); + /** + * Makes a new timer pool. + * + * @return a new timer pool + */ + protected ScheduledExecutorService makeTimerPool() { + return Executors.newScheduledThreadPool(1); + } + + /** + * Makes a new topic. + * + * @param topicName topic name + * @return a new topic + */ + protected TopicData makeTopicData(String topicName) { + return new TopicData(topicName); } }