2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.m2.test;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.TimeUnit;
29 import javax.ws.rs.Consumes;
30 import javax.ws.rs.GET;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.QueryParam;
36 import javax.ws.rs.core.MediaType;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * This class simulates a UEB/DMAAP server.
46 public class SimDmaap {
47 private static Logger logger = LoggerFactory.getLogger(SimDmaap.class);
49 // maps topic name to 'Topic' instance
50 static Map<String,Topic> topicTable = new ConcurrentHashMap<>();
53 * Each instance of this class corresponds to a DMAAP or UEB topic.
59 // maps group name into group instance
60 Map<String,Group> groupTable = new ConcurrentHashMap<>();
63 * Create or get a Topic.
65 * @param name the topic name
66 * @return the associated Topic instance
68 static Topic createOrGet(String name) {
69 // look up the topic name
70 Topic topicObj = topicTable.get(name);
71 if (topicObj == null) {
72 // no entry found -- the following will create one, without
73 // the need for explicit synchronization
74 topicTable.putIfAbsent(name, new Topic(name));
75 topicObj = topicTable.get(name);
81 * Constructor - initialize the 'topic' field.
83 * @param topic the topic name
85 private Topic(String topic) {
90 * Handle an incoming '/events/{topic}' POST REST message.
92 * @param the body of the REST message
93 * @return the appropriate JSON response
95 String post(String data) {
96 // start of message processing
97 long startTime = System.currentTimeMillis();
99 // current and ending indices to the 'data' field
101 int end = data.length();
103 // the number of messages retrieved so far
104 int messageCount = 0;
107 // The body of the message may consist of multiple JSON messages,
108 // each preceded by 3 integers separated by '.'. The second one
109 // is the length, in bytes (the third seems to be some kind of
110 // channel identifier).
112 int leftBrace = data.indexOf('{', cur);
117 String[] prefix = data.substring(cur,leftBrace).split("\\.");
118 if (prefix.length == 3) {
120 // determine length of message, and advance current position
121 int length = Integer.parseInt(prefix[1]);
122 cur = leftBrace + length;
124 // extract message, and update count -- each '\' is converted
125 // to '\\', and each double quote has a '\' character placed
126 // before it, so the overall message can be placed in double
127 // quotes, and parsed as a literal string
128 String message = data.substring(leftBrace, cur)
129 .replace("\\", "\\\\").replace("\"", "\\\"")
130 .replace("\n", "\\n");
133 // send to all listening groups
134 for (Group group : groupTable.values()) {
135 group.messages.add(message);
137 } catch (Exception e) {
138 logger.info("{}: {}", prefix[1], e);
141 } else if (cur == 0) {
142 // there is only a single message -- extract it, and update count
143 String message = data.substring(leftBrace, end)
144 .replace("\\", "\\\\").replace("\"", "\\\"")
145 .replace("\n", "\\n");
148 // send to all listening grops
149 for (Group group : groupTable.values()) {
150 group.messages.add(message);
154 // don't know what this is -- toss it
159 // generate response message
160 long elapsedTime = System.currentTimeMillis() - startTime;
162 + " \"count\": " + messageCount + ",\n"
163 + " \"serverTimeMs\": " + elapsedTime + "\n"
168 * read one or more incoming messages.
170 * @param group the 'consumerGroup' value
171 * @param timeout how long to wait for a message, in milliseconds
172 * @param limit the maximum number of messages to receive
173 * @return a JSON array, containing 0-limit messages
175 String get(String group, long timeout, int limit)
176 throws InterruptedException {
177 // look up the group -- create one if it doesn't exist
178 Group groupObj = groupTable.get(group);
179 if (groupObj == null) {
180 // no entry found -- the following will create one, without
181 // the need for explicit synchronization
182 groupTable.putIfAbsent(group, new Group());
183 groupObj = groupTable.get(group);
186 // pass it on to the 'Group' instance
187 return groupObj.get(timeout, limit);
191 /* ============================================================ */
194 * Each instance of this class corresponds to a Consumer Group.
197 // messages queued for this group
198 private BlockingQueue<String> messages = new LinkedBlockingQueue<>();
201 * Retrieve messages sent to this group.
203 * @param timeout how long to wait for a message, in milliseconds
204 * @param limit the maximum number of messages to receive
205 * @return a JSON array, containing 0-limit messages
207 String get(long timeout, int limit) throws InterruptedException {
208 String message = messages.poll(timeout, TimeUnit.MILLISECONDS);
209 if (message == null) {
210 // timed out without messages
214 // use 'StringBuilder' to assemble the response -- add the first message
215 StringBuilder builder = new StringBuilder();
216 builder.append("[\"").append(message);
218 // add up to '<limit>-1' more messages
219 for (int i = 1 ; i < limit ; i += 1) {
220 // fetch the next message -- don't wait if it isn't currently there
221 message = messages.poll();
222 if (message == null) {
223 // no more currently available
226 builder.append("\",\"").append(message);
228 builder.append("\"]");
229 return builder.toString();
233 /* ============================================================ */
236 * Process an HTTP POST to /events/{topic}.
239 @Path("/events/{topic}")
240 @Consumes("application/cambria")
241 @Produces(MediaType.APPLICATION_JSON)
242 public String send(@PathParam("topic") String topic,
244 logger.info("Send: topic={}", topic);
245 return Topic.createOrGet(topic).post(data);
249 * Process an HTTP GET to /events/{topic}/{group}/{id}.
252 @Path("/events/{topic}/{group}/{id}")
253 @Consumes(MediaType.TEXT_PLAIN)
254 @Produces(MediaType.APPLICATION_JSON)
255 public String receive(@PathParam("topic") String topic,
256 @PathParam("group") String group,
257 @PathParam("id") String id,
258 @QueryParam("timeout") long timeout,
259 @QueryParam("limit") int limit)
260 throws InterruptedException {
262 logger.info("Receive: topic={}, group={}, id={}, timeout={}, limit={}",
263 topic, group, id, timeout, limit);
264 return Topic.createOrGet(topic).get(group, timeout, limit);