2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.m2.test;
23 import static org.junit.Assert.assertTrue;
25 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
29 import com.att.nsa.cambria.client.CambriaConsumer;
31 import com.google.gson.Gson;
32 import com.google.gson.GsonBuilder;
33 import com.google.gson.JsonElement;
34 import com.google.gson.JsonObject;
35 import com.google.gson.JsonPrimitive;
38 import java.io.FileInputStream;
39 import java.io.FileNotFoundException;
40 import java.io.FileOutputStream;
41 import java.io.IOException;
42 import java.io.InputStream;
43 import java.util.LinkedList;
44 import java.util.List;
45 import java.util.Properties;
46 import java.util.UUID;
47 import java.util.concurrent.LinkedBlockingQueue;
48 import java.util.concurrent.TimeUnit;
50 import org.eclipse.jetty.server.Server;
51 import org.eclipse.jetty.server.ServerConnector;
52 import org.eclipse.jetty.servlet.ServletContextHandler;
53 import org.eclipse.jetty.servlet.ServletHolder;
54 import org.onap.policy.common.utils.coder.StandardCoder;
55 import org.onap.policy.drools.system.PolicyEngineConstants;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
60 private static Logger logger = LoggerFactory.getLogger(Util.class);
62 // used for JSON <-> String conversion
63 private static StandardCoder coder = new StandardCoder();
65 // used for pretty-printing: gson.toJson(JsonObject obj)
66 private static Gson gson =
67 new GsonBuilder().setPrettyPrinting().serializeNulls().create();
69 // contains the currently running set of servers
70 private static List<Server> runningServers = new LinkedList<>();
73 * Read from an 'InputStream' until EOF or until it is closed. This method
74 * may block, depending on the type of 'InputStream'.
76 * @param input This is the input stream
77 * @return A 'String' containing the contents of the input stream
79 public static String inputStreamToString(InputStream input) {
80 StringBuilder sb = new StringBuilder();
81 byte[] buffer = new byte[8192];
85 while ((length = input.read(buffer)) > 0) {
86 sb.append(new String(buffer, 0, length));
88 } catch (IOException e) {
89 // return what we have so far
95 * Read in a file, converting the contents to a string.
97 * @param file the input file
98 * @return a String containing the contents of the file
100 public static String fileToString(File file)
101 throws IOException, FileNotFoundException {
102 try (FileInputStream fis = new FileInputStream(file)) {
103 String string = inputStreamToString(fis);
109 * Create a file containing the contents of the specified string.
111 * @param string the input string
112 * @param suffix the suffix to pass to 'createTempFile
113 * @return a File, whose contents contain the string
115 public static File stringToFile(String string, String suffix)
117 File file = File.createTempFile("templates-util", suffix);
120 try (FileOutputStream fos = new FileOutputStream(file)) {
121 fos.write(string.getBytes());
127 * Create a file containing the contents of the specified string.
129 * @param string the input string
130 * @return a File, whose contents contain the string
132 public static File stringToFile(String string)
134 return stringToFile(string, "");
138 * This method converts a YAML string into one that can be embedded into
141 * @param yaml the input string, which is typically read from a file
142 * @return the converted string
144 public static String convertYaml(String yaml) {
145 yaml = yaml.replace("\n", "%0A");
146 yaml = yaml.replace("\r", "");
147 yaml = yaml.replace(":", "%3A");
148 yaml = yaml.replace(' ', '+');
153 * This is a convenience method which reads a file into a string, and
154 * then does a set of string replacements on it. The real purpose is to
155 * make it easy to do '${parameter}' replacements in files, as part of
156 * building a Drools artifact.
158 * @param fileName this is the input file name
159 * @param args these parameters come in pairs:
160 * 'input-string' and 'output-string'.
161 * @return a String containing the contents of the file, with the parameters
164 public static String openAndReplace(String fileName, String... args)
165 throws IOException, FileNotFoundException {
166 String text = fileToString(new File(fileName));
167 for (int i = 0 ; i < args.length ; i += 2) {
168 text = text.replace(args[i], args[i + 1]);
174 * Convert an Object to a JsonElement.
176 * @param object the object to convert
177 * @return a JsonElement that corresponds to 'object'
179 public static JsonElement toJsonElement(Object object) {
180 if (object == null || object instanceof JsonElement) {
181 return (JsonElement) object;
183 if (object instanceof Number) {
184 return new JsonPrimitive((Number) object);
186 if (object instanceof Boolean) {
187 return new JsonPrimitive((Boolean) object);
189 if (object instanceof Character) {
190 return new JsonPrimitive((Character) object);
192 return new JsonPrimitive(object.toString());
196 * This is a convenience method to build a 'JsonObject', and populate
197 * it with a set of keyword/value pairs.
199 * @param data this parameter comes in pairs: 'keyword', and 'value'
200 * @return the populated JsonObject
202 public static JsonObject json(Object... data) {
203 JsonObject obj = new JsonObject();
204 for (int i = 0 ; i < data.length ; i += 2) {
205 obj.add(data[i].toString(), toJsonElement(data[i + 1]));
211 * Convert a JsonElement to a String (pretty-printing).
213 * @param jsonElement the object to convert
214 * @return a pretty-printed string
216 public static String prettyPrint(JsonElement jsonElement) {
217 return gson.toJson(jsonElement);
221 * This method is used to check whether a JSON message has a set of fields
222 * populated with the values expected.
224 * @param subset this is a 'JsonObject', which contains field names and
225 * values (the values are interpreted as regular expressions). The values
226 * may also be 'JsonObject' instances, in which case they are compared
228 * @param whole ordinarily, this will be a 'JsonObject', and will contain
229 * a superset of the fields in 'subset'. If not, the 'assert' fails.
231 public static void assertSubset(JsonObject subset, Object whole) {
232 StringBuilder sb = new StringBuilder();
233 assertSubsetAssist(sb, "", subset, toJsonElement(whole));
234 String sbString = sb.toString();
235 assertTrue(sbString, sbString.isEmpty());
239 * This is similar to 'assertSubset', but just returns 'true' if the
242 * @param subset this is a 'JsonObject', which contains field names and
243 * values (the values are interpreted as regular expressions). The values
244 * may also be 'JsonObject' instances, in which case they are compared
246 * @param whole ordinarily, this will be a 'JsonObject', and will contain
247 * a superset of the fields in 'subset'. If not, the 'assert' fails.
248 * @return 'true' if 'whole' is a superset of 'subset'
250 public static boolean testSubset(JsonObject subset, Object whole) {
251 StringBuilder sb = new StringBuilder();
252 assertSubsetAssist(sb, "", subset, toJsonElement(whole));
253 return sb.length() == 0;
257 * This is an internal support method for 'assertSubset' and 'testSubset',
258 * and handles the recursive comparison.
260 * @param sb a 'StringBuilder', which is appended to when there are
262 * @param prefix the field name being compared (the empty string indicates
263 * the top-level field).
264 * @param subset the 'JsonObject' being compared at this level
265 * @param argWhole the value being tested -- if it is not a 'JsonObject',
266 * the comparison fails
268 private static void assertSubsetAssist(StringBuilder sb, String prefix, JsonObject subset, JsonElement argWhole) {
269 if (!(argWhole.isJsonObject())) {
270 sb.append(prefix).append(" is not a JsonObject\n");
273 JsonObject whole = argWhole.getAsJsonObject();
274 for (String key : subset.keySet()) {
275 String fullKey = (prefix.isEmpty() ? key : prefix + "." + key);
276 JsonElement value = subset.get(key);
277 JsonElement value2 = whole.get(key);
278 if (value.isJsonObject()) {
279 assertSubsetAssist(sb, fullKey, value.getAsJsonObject(), value2);
280 } else if (!value.equals(value2)
281 && (value2 == null || !value2.toString().matches(value.toString()))) {
284 .append(String.valueOf(value2))
285 .append(", expected ")
286 .append(String.valueOf(value))
293 * Do whatever needs to be done to start the server. I don't know exactly
294 * what abstractions the various pieces provide, but the following code
295 * ties the pieces together, and starts up the server.
297 * @param name used as the 'ServerConnector' name, and also used to generate
298 * a name for the server thread
299 * @param host the host IP address to bind to
300 * @param port the port to bind to
301 * @param clazz the class containing the provider methods
303 public static void startRestServer(String name, String host, int port, Class<?> clazz) {
304 ServletContextHandler context =
305 new ServletContextHandler(ServletContextHandler.SESSIONS);
306 context.setContextPath("/");
308 final Server jettyServer = new Server();
310 ServerConnector connector = new ServerConnector(jettyServer);
311 connector.setName(name);
312 connector.setReuseAddress(true);
313 connector.setPort(port);
314 connector.setHost(host);
316 jettyServer.addConnector(connector);
317 jettyServer.setHandler(context);
319 ServletHolder holder =
320 context.addServlet(org.glassfish.jersey.servlet.ServletContainer.class.getName(), "/*");
321 holder.setInitParameter(
322 "jersey.config.server.provider.classnames",
323 "org.onap.policy.common.gson.GsonMessageBodyHandler"
324 + "," + clazz.getName());
326 synchronized (runningServers) {
327 runningServers.add(jettyServer);
334 logger.info("{}: back from jettyServer.join()", name);
335 } catch (Exception e) {
336 logger.info(name + ": Exception starting jettyServer", e);
338 }, "REST Server: " + name).start();
341 private static boolean initNeeded = true;
344 * This method starts services shared by all of the tests. The services are
345 * started the first time it is invoked -- subsequent invocations have no
348 public static void commonInit() {
352 // start DMAAP Simulator
353 startRestServer("simdmaap", "127.0.71.250", 3904, SimDmaap.class);
355 // start Guard Simulator
356 startRestServer("simguard", "127.0.71.201", 8443, SimGuard.class);
358 // start PolicyEngine
359 PolicyEngineConstants.getManager().configure(new Properties());
360 PolicyEngineConstants.getManager().start();
365 * This method shuts down all of the servers that were started.
367 public static void commonShutdown() {
368 synchronized (runningServers) {
369 for (Server server : runningServers) {
372 } catch (Exception e) {
373 logger.info("Exception shutting down server: {}", e);
376 runningServers.clear();
381 /* ============================================================ */
384 * This class is used to create an outgoing (publisher) topic message
385 * channel. 'topic' is the only parameter -- everything else is hard-wired.
387 public static class Output {
388 CambriaBatchingPublisher publisher;
392 * Constructor - create the outgoing topic message channel.
394 * @param topic a DMAAP or UEB topic name
396 public Output(String topic) throws Exception {
398 PublisherBuilder builder =
399 new CambriaClientBuilders.PublisherBuilder();
401 .usingHosts("127.0.71.250")
403 .withSocketTimeout(5000);
404 publisher = builder.build();
408 * Send a JSON message out this channel.
410 * @param msg a 'JsonObject' containing the message to be sent
412 public void send(JsonObject msg) throws Exception {
413 logger.info("Sending message, topic = {}\n{}",
414 topic, gson.toJson(msg));
415 publisher.send("123", msg.toString());
421 public void close() {
426 /* ============================================================ */
429 * This class is used to create an incoming (consumer) topic message channel,
430 * as well as a Thread that reads from it. Incoming messages are placed in
431 * a 'LinkedBlockingQueue', which may be polled for messages.
433 public static class Input extends Thread {
434 CambriaConsumer consumer;
436 LinkedBlockingQueue<JsonObject> queue = new LinkedBlockingQueue<>();
437 volatile boolean running = true;
440 * Constructor - create the incoming topic message channel.
442 * @param topic a DMAAP or UEB topic name
444 public Input(String topic) throws Exception {
448 ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
450 .knownAs(UUID.randomUUID().toString(), "1")
451 .usingHosts("127.0.71.250")
454 .receivingAtMost(100)
455 .withSocketTimeout(20000);
456 consumer = builder.build();
461 * This is the Thread main loop. It fetches messages, and queues them.
467 for (String message : consumer.fetch()) {
468 // a message was received -- parse it as JSON
469 JsonObject msg = coder.decode(message, JsonObject.class);
471 // construct a message to print, and print it
472 logger.info("Received message, topic = {}\n{}",
473 topic, gson.toJson(msg));
478 } catch (Exception e) {
485 * Return the first message in the queue. If none are available, wait up
486 * to 30 seconds for one to appear.
488 * @return a 'JsonObject' if a message has been received, and 'null' if not
490 public JsonObject poll() throws InterruptedException {
491 return queue.poll(30, TimeUnit.SECONDS);
495 * Stop the thread, and close the channel.
497 public void close() {