policy/drools-apps jdk11 upgrades
[policy/drools-applications.git] / controlloop / m2 / test / src / test / java / org / onap / policy / m2 / test / SimDmaap.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * m2/test
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.m2.test;
22
23 import java.util.Map;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.TimeUnit;
28
29 import javax.ws.rs.Consumes;
30 import javax.ws.rs.GET;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.QueryParam;
36 import javax.ws.rs.core.MediaType;
37
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * This class simulates a UEB/DMAAP server.
43  */
44
45 @Path("/")
46 public class SimDmaap {
47     private static Logger logger = LoggerFactory.getLogger(SimDmaap.class);
48
49     // maps topic name to 'Topic' instance
50     static Map<String,Topic> topicTable = new ConcurrentHashMap<>();
51
52     /**
53      * Each instance of this class corresponds to a DMAAP or UEB topic.
54      */
55     static class Topic {
56         // topic name
57         String topic;
58
59         // maps group name into group instance
60         Map<String,Group> groupTable = new ConcurrentHashMap<>();
61
62         /**
63          * Create or get a Topic.
64          *
65          * @param name the topic name
66          * @return the associated Topic instance
67          */
68         static Topic createOrGet(String name) {
69             // look up the topic name
70             Topic topicObj = topicTable.get(name);
71             if (topicObj == null) {
72                 // no entry found -- the following will create one, without
73                 // the need for explicit synchronization
74                 topicTable.putIfAbsent(name, new Topic(name));
75                 topicObj = topicTable.get(name);
76             }
77             return topicObj;
78         }
79
80         /**
81          * Constructor - initialize the 'topic' field.
82          *
83          * @param topic the topic name
84          */
85         private Topic(String topic) {
86             this.topic = topic;
87         }
88
89         /**
90          * Handle an incoming '/events/{topic}' POST REST message.
91          *
92          * @param the body of the REST message
93          * @return the appropriate JSON response
94          */
95         String post(String data) {
96             // start of message processing
97             long startTime = System.currentTimeMillis();
98
99             // current and ending indices to the 'data' field
100             int cur = 0;
101             int end = data.length();
102
103             // the number of messages retrieved so far
104             int messageCount = 0;
105
106             while (cur < end) {
107                 // The body of the message may consist of multiple JSON messages,
108                 // each preceded by 3 integers separated by '.'. The second one
109                 // is the length, in bytes (the third seems to be some kind of
110                 // channel identifier).
111
112                 int leftBrace = data.indexOf('{', cur);
113                 if (leftBrace < 0) {
114                     // no more messages
115                     break;
116                 }
117                 String[] prefix = data.substring(cur,leftBrace).split("\\.");
118                 if (prefix.length == 3) {
119                     try {
120                         // determine length of message, and advance current position
121                         int length = Integer.parseInt(prefix[1]);
122                         cur = leftBrace + length;
123
124                         // extract message, and update count -- each '\' is converted
125                         // to '\\', and each double quote has a '\' character placed
126                         // before it, so the overall message can be placed in double
127                         // quotes, and parsed as a literal string
128                         String message = data.substring(leftBrace, cur)
129                                          .replace("\\", "\\\\").replace("\"", "\\\"")
130                                          .replace("\n", "\\n");
131                         messageCount += 1;
132
133                         // send to all listening groups
134                         for (Group group : groupTable.values()) {
135                             group.messages.add(message);
136                         }
137                     } catch (Exception e) {
138                         logger.info("{}: {}", prefix[1], e);
139                         break;
140                     }
141                 } else if (cur == 0) {
142                     // there is only a single message -- extract it, and update count
143                     String message = data.substring(leftBrace, end)
144                                      .replace("\\", "\\\\").replace("\"", "\\\"")
145                                      .replace("\n", "\\n");
146                     messageCount += 1;
147
148                     // send to all listening grops
149                     for (Group group : groupTable.values()) {
150                         group.messages.add(message);
151                     }
152                     break;
153                 } else {
154                     // don't know what this is -- toss it
155                     break;
156                 }
157             }
158
159             // generate response message
160             long elapsedTime = System.currentTimeMillis() - startTime;
161             return "{\n"
162                    + "    \"count\": " + messageCount + ",\n"
163                    + "    \"serverTimeMs\": " + elapsedTime + "\n"
164                    + "}";
165         }
166
167         /**
168          * read one or more incoming messages.
169          *
170          * @param group the 'consumerGroup' value
171          * @param timeout how long to wait for a message, in milliseconds
172          * @param limit the maximum number of messages to receive
173          * @return a JSON array, containing 0-limit messages
174          */
175         String get(String group, long timeout, int limit)
176             throws InterruptedException {
177             // look up the group -- create one if it doesn't exist
178             Group groupObj = groupTable.get(group);
179             if (groupObj == null) {
180                 // no entry found -- the following will create one, without
181                 // the need for explicit synchronization
182                 groupTable.putIfAbsent(group, new Group());
183                 groupObj = groupTable.get(group);
184             }
185
186             // pass it on to the 'Group' instance
187             return groupObj.get(timeout, limit);
188         }
189     }
190
191     /* ============================================================ */
192
193     /**
194      * Each instance of this class corresponds to a Consumer Group.
195      */
196     static class Group {
197         // messages queued for this group
198         private BlockingQueue<String> messages = new LinkedBlockingQueue<>();
199
200         /**
201          * Retrieve messages sent to this group.
202          *
203          * @param timeout how long to wait for a message, in milliseconds
204          * @param limit the maximum number of messages to receive
205          * @return a JSON array, containing 0-limit messages
206          */
207         String get(long timeout, int limit) throws InterruptedException {
208             String message = messages.poll(timeout, TimeUnit.MILLISECONDS);
209             if (message == null) {
210                 // timed out without messages
211                 return "[]";
212             }
213
214             // use 'StringBuilder' to assemble the response -- add the first message
215             StringBuilder builder = new StringBuilder();
216             builder.append("[\"").append(message);
217
218             // add up to '<limit>-1' more messages
219             for (int i = 1 ; i < limit ; i += 1) {
220                 // fetch the next message -- don't wait if it isn't currently there
221                 message = messages.poll();
222                 if (message == null) {
223                     // no more currently available
224                     break;
225                 }
226                 builder.append("\",\"").append(message);
227             }
228             builder.append("\"]");
229             return builder.toString();
230         }
231     }
232
233     /* ============================================================ */
234
235     /**
236      * Process an HTTP POST to /events/{topic}.
237      */
238     @POST
239     @Path("/events/{topic}")
240     @Consumes("application/cambria")
241     @Produces(MediaType.APPLICATION_JSON)
242     public String send(@PathParam("topic") String topic,
243                        String data) {
244         logger.info("Send: topic={}", topic);
245         return Topic.createOrGet(topic).post(data);
246     }
247
248     /**
249      * Process an HTTP GET to /events/{topic}/{group}/{id}.
250      */
251     @GET
252     @Path("/events/{topic}/{group}/{id}")
253     @Consumes(MediaType.TEXT_PLAIN)
254     @Produces(MediaType.APPLICATION_JSON)
255     public String receive(@PathParam("topic") String topic,
256                           @PathParam("group") String group,
257                           @PathParam("id") String id,
258                           @QueryParam("timeout") long timeout,
259                           @QueryParam("limit") int limit)
260         throws InterruptedException {
261
262         logger.info("Receive: topic={}, group={}, id={}, timeout={}, limit={}",
263                     topic, group, id, timeout, limit);
264         return Topic.createOrGet(topic).get(group, timeout, limit);
265     }
266 }