2119a6521bddc876f4ebf77a6226d3c1e5e79e90
[policy/models.git] / models-sim / models-sim-dmaap / src / main / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
5  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.models.sim.dmaap.provider;
24
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33 import javax.ws.rs.core.Response;
34 import javax.ws.rs.core.Response.Status;
35 import lombok.Getter;
36 import lombok.Setter;
37 import org.onap.policy.common.utils.services.ServiceManagerContainer;
38 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Provider to simulate DMaaP.
44  *
45  * @author Liam Fallon (liam.fallon@est.tech)
46  */
47 public class DmaapSimProvider extends ServiceManagerContainer {
48     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
49
50     @Getter
51     @Setter
52     private static DmaapSimProvider instance;
53
54     /**
55      * Maps a topic name to its data.
56      */
57     private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>();
58
59     /**
60      * Thread used to remove idle consumers from the topics.
61      */
62     private ScheduledExecutorService timerPool;
63
64
65     /**
66      * Constructs the object.
67      *
68      * @param params parameters
69      */
70     public DmaapSimProvider(DmaapSimParameterGroup params) {
71         addAction("Topic Sweeper", () -> {
72             timerPool = makeTimerPool();
73             timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(),
74                             TimeUnit.SECONDS);
75         }, () -> timerPool.shutdown());
76     }
77
78     /**
79      * Process a DMaaP message.
80      *
81      * @param topicName the topic name
82      * @param dmaapMessage the message to process
83      * @return a response to the message
84      */
85     @SuppressWarnings("unchecked")
86     public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
87         LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage);
88
89         List<Object> lst;
90
91         if (dmaapMessage instanceof List) {
92             lst = (List<Object>) dmaapMessage;
93         } else {
94             lst = Collections.singletonList(dmaapMessage);
95         }
96
97         TopicData topic = topic2data.get(topicName);
98
99         /*
100          * Write all messages and return the count. If the topic doesn't exist yet, then
101          * there are no subscribers to receive the messages, thus treat it as if all
102          * messages were published.
103          */
104         int nmessages = (topic != null ? topic.write(lst) : lst.size());
105
106         Map<String, Object> map = new LinkedHashMap<>();
107         map.put("serverTimeMs", 0);
108         map.put("count", nmessages);
109
110         return Response.status(Response.Status.OK).entity(map).build();
111     }
112
113     /**
114      * Wait for and return a DMaaP message.
115      *
116      * @param topicName The topic to wait on
117      * @param consumerGroup the consumer group that is waiting
118      * @param consumerId the consumer ID that is waiting
119      * @param limit the maximum number of messages to get
120      * @param timeoutMs the length of time to wait for
121      * @return the DMaaP message or
122      */
123     public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
124                     final int limit, final long timeoutMs) {
125
126         LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup,
127                         consumerId, limit, timeoutMs);
128
129         try {
130             List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
131                             timeoutMs);
132
133             LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, lst.size(), consumerGroup,
134                             consumerId);
135             return Response.status(Status.OK).entity(lst).build();
136
137         } catch (InterruptedException e) {
138             LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup,
139                             consumerId, e);
140             Thread.currentThread().interrupt();
141             return Response.status(Status.GONE).entity(Collections.emptyList()).build();
142         }
143     }
144
145     /**
146      * Returns the list of default topics.
147      *
148      * @return the topic list
149      */
150     public Response processDmaapTopicsGet() {
151
152         LOGGER.debug("Request for listing DMaaP topics");
153         var response = new DmaapGetTopicResponse();
154         response.setTopics(List.of("POLICY-PDP-PAP", "POLICY-NOTIFICATION", "unauthenticated.DCAE_CL_OUTPUT",
155                         "POLICY-CL-MGT"));
156         return Response.status(Status.OK).entity(response).build();
157     }
158
159     /**
160      * Task to remove idle consumers from each topic.
161      */
162     private class SweeperTask implements Runnable {
163         @Override
164         public void run() {
165             topic2data.values().forEach(TopicData::removeIdleConsumers);
166         }
167     }
168
169     // the following methods may be overridden by junit tests
170
171     /**
172      * Makes a new timer pool.
173      *
174      * @return a new timer pool
175      */
176     protected ScheduledExecutorService makeTimerPool() {
177         return Executors.newScheduledThreadPool(1);
178     }
179
180     /**
181      * Makes a new topic.
182      *
183      * @param topicName topic name
184      * @return a new topic
185      */
186     protected TopicData makeTopicData(String topicName) {
187         return new TopicData(topicName);
188     }
189 }