2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019, 2023 Nordix Foundation.
4 * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
5 * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.models.sim.dmaap.provider;
25 import jakarta.ws.rs.core.Response;
26 import jakarta.ws.rs.core.Response.Status;
27 import java.util.Collections;
28 import java.util.LinkedHashMap;
29 import java.util.List;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
37 import org.onap.policy.common.utils.services.ServiceManagerContainer;
38 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Provider to simulate DMaaP.
45 * @author Liam Fallon (liam.fallon@est.tech)
47 public class DmaapSimProvider extends ServiceManagerContainer {
48 private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
52 private static DmaapSimProvider instance;
55 * Maps a topic name to its data.
57 private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>();
60 * Thread used to remove idle consumers from the topics.
62 private ScheduledExecutorService timerPool;
66 * Constructs the object.
68 * @param params parameters
70 public DmaapSimProvider(DmaapSimParameterGroup params) {
71 addAction("Topic Sweeper", () -> {
72 timerPool = makeTimerPool();
73 timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(),
75 }, () -> timerPool.shutdown());
79 * Process a DMaaP message.
81 * @param topicName the topic name
82 * @param dmaapMessage the message to process
83 * @return a response to the message
85 @SuppressWarnings("unchecked")
86 public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
87 LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage);
91 if (dmaapMessage instanceof List) {
92 lst = (List<Object>) dmaapMessage;
94 lst = Collections.singletonList(dmaapMessage);
97 TopicData topic = topic2data.get(topicName);
100 * Write all messages and return the count. If the topic doesn't exist yet, then
101 * there are no subscribers to receive the messages, thus treat it as if all
102 * messages were published.
104 int nmessages = (topic != null ? topic.write(lst) : lst.size());
106 Map<String, Object> map = new LinkedHashMap<>();
107 map.put("serverTimeMs", 0);
108 map.put("count", nmessages);
110 return Response.status(Response.Status.OK).entity(map).build();
114 * Wait for and return a DMaaP message.
116 * @param topicName The topic to wait on
117 * @param consumerGroup the consumer group that is waiting
118 * @param consumerId the consumer ID that is waiting
119 * @param limit the maximum number of messages to get
120 * @param timeoutMs the length of time to wait for
121 * @return the DMaaP message or
123 public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
124 final int limit, final long timeoutMs) {
126 LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup,
127 consumerId, limit, timeoutMs);
130 List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
133 LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, lst.size(), consumerGroup,
135 return Response.status(Status.OK).entity(lst).build();
137 } catch (InterruptedException e) {
138 LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup,
140 Thread.currentThread().interrupt();
141 return Response.status(Status.GONE).entity(Collections.emptyList()).build();
146 * Returns the list of default topics.
148 * @return the topic list
150 public Response processDmaapTopicsGet() {
152 LOGGER.debug("Request for listing DMaaP topics");
153 var response = new DmaapGetTopicResponse();
154 response.setTopics(List.of("POLICY-PDP-PAP", "POLICY-NOTIFICATION", "unauthenticated.DCAE_CL_OUTPUT",
156 return Response.status(Status.OK).entity(response).build();
160 * Task to remove idle consumers from each topic.
162 private class SweeperTask implements Runnable {
165 topic2data.values().forEach(TopicData::removeIdleConsumers);
169 // the following methods may be overridden by junit tests
172 * Makes a new timer pool.
174 * @return a new timer pool
176 protected ScheduledExecutorService makeTimerPool() {
177 return Executors.newScheduledThreadPool(1);
183 * @param topicName topic name
184 * @return a new topic
186 protected TopicData makeTopicData(String topicName) {
187 return new TopicData(topicName);