Merge "Flesh out DMaaP simulator"
[policy/models.git] / models-sim / models-sim-dmaap / src / main / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.models.sim.dmaap.provider;
23
24 import java.util.Collections;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import javax.ws.rs.core.Response;
33 import javax.ws.rs.core.Response.Status;
34 import lombok.Getter;
35 import lombok.Setter;
36 import org.onap.policy.common.utils.services.ServiceManagerContainer;
37 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Provider to simulate DMaaP.
43  *
44  * @author Liam Fallon (liam.fallon@est.tech)
45  */
46 public class DmaapSimProvider extends ServiceManagerContainer {
47     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
48
49     @Getter
50     @Setter
51     private static DmaapSimProvider instance;
52
53     /**
54      * Maps a topic name to its data.
55      */
56     private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>();
57
58     /**
59      * Thread used to remove idle consumers from the topics.
60      */
61     private ScheduledExecutorService timerPool;
62
63
64     /**
65      * Constructs the object.
66      *
67      * @param params parameters
68      */
69     public DmaapSimProvider(DmaapSimParameterGroup params) {
70         addAction("Topic Sweeper", () -> {
71             timerPool = makeTimerPool();
72             timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(),
73                             TimeUnit.SECONDS);
74         }, () -> timerPool.shutdown());
75     }
76
77     /**
78      * Process a DMaaP message.
79      *
80      * @param topicName the topic name
81      * @param dmaapMessage the message to process
82      * @return a response to the message
83      */
84     @SuppressWarnings("unchecked")
85     public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
86         LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage);
87
88         List<Object> lst;
89
90         if (dmaapMessage instanceof List) {
91             lst = (List<Object>) dmaapMessage;
92         } else {
93             lst = Collections.singletonList(dmaapMessage);
94         }
95
96         TopicData topic = topic2data.get(topicName);
97
98         /*
99          * Write all messages and return the count. If the topic doesn't exist yet, then
100          * there are no subscribers to receive the messages, thus treat it as if all
101          * messages were published.
102          */
103         int nmessages = (topic != null ? topic.write(lst) : lst.size());
104
105         Map<String, Object> map = new LinkedHashMap<>();
106         map.put("serverTimeMs", 0);
107         map.put("count", nmessages);
108
109         return Response.status(Response.Status.OK).entity(map).build();
110     }
111
112     /**
113      * Wait for and return a DMaaP message.
114      *
115      * @param topicName The topic to wait on
116      * @param consumerGroup the consumer group that is waiting
117      * @param consumerId the consumer ID that is waiting
118      * @param limit the maximum number of messages to get
119      * @param timeoutMs the length of time to wait for
120      * @return the DMaaP message or
121      */
122     public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
123                     final int limit, final long timeoutMs) {
124
125         LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup,
126                         consumerId, limit, timeoutMs);
127
128         try {
129             List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
130                             timeoutMs);
131
132             if (lst.isEmpty() && timeoutMs > 0) {
133                 LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId);
134                 return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build();
135
136             } else {
137                 LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(),
138                                 consumerId);
139                 return Response.status(Status.OK).entity(lst).build();
140             }
141
142         } catch (InterruptedException e) {
143             LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup,
144                             consumerId, e);
145             Thread.currentThread().interrupt();
146             return Response.status(Status.GONE).entity(Collections.emptyList()).build();
147         }
148     }
149
150     /**
151      * Task to remove idle consumers from each topic.
152      */
153     private class SweeperTask implements Runnable {
154         @Override
155         public void run() {
156             topic2data.values().forEach(TopicData::removeIdleConsumers);
157         }
158     }
159
160     // the following methods may be overridden by junit tests
161
162     /**
163      * Makes a new timer pool.
164      *
165      * @return a new timer pool
166      */
167     protected ScheduledExecutorService makeTimerPool() {
168         return Executors.newScheduledThreadPool(1);
169     }
170
171     /**
172      * Makes a new topic.
173      *
174      * @param topicName topic name
175      * @return a new topic
176      */
177     protected TopicData makeTopicData(String topicName) {
178         return new TopicData(topicName);
179     }
180 }