Add a DMaaP simulator 29/26329/2
authorCharles Cole <cc847m@att.com>
Thu, 14 Dec 2017 14:59:26 +0000 (08:59 -0600)
committerCharles Cole <cc847m@att.com>
Thu, 14 Dec 2017 20:58:16 +0000 (14:58 -0600)
Added a DMaaP simulator for testing applications.  One current
limitation is that the simulator does not support multiple "subscribers"
on the same topic; if someone gets a message, that message cannot be
subsequently retrieved by anyone else.  The simulator has also not been
tested for concurrent getting and posting.

Also added a way to set the response code DMaaP would return for a get
to test that policy can gracefully handle errors.

It may need some work to become truely its own "feature"

Issue-ID: POLICY-489
Change-Id: I524981bdf5e4e825f13e6197dda11d9498e4f4bf
Signed-off-by: Charles Cole <cc847m@att.com>
feature-simulators/pom.xml [new file with mode: 0644]
feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java [new file with mode: 0644]
feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java [new file with mode: 0644]
pom.xml

diff --git a/feature-simulators/pom.xml b/feature-simulators/pom.xml
new file mode 100644 (file)
index 0000000..767739b
--- /dev/null
@@ -0,0 +1,53 @@
+<!--
+  ============LICENSE_START=======================================================
+  ONAP Policy Engine - Drools PDP
+  ================================================================================
+  Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  ============LICENSE_END=========================================================
+  -->
+  
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+       <modelVersion>4.0.0</modelVersion>
+  
+       <parent>
+               <groupId>org.onap.policy.drools-pdp</groupId>
+               <artifactId>drools-pdp</artifactId>
+               <version>1.2.0-SNAPSHOT</version>
+       </parent>
+       
+       <artifactId>feature-simulators</artifactId>
+
+  <name>feature-simulators</name>
+
+  <properties>
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <maven.compiler.target>1.8</maven.compiler.target>
+  </properties>
+  
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.onap.policy.drools-pdp</groupId>
+      <artifactId>policy-endpoints</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
new file mode 100644 (file)
index 0000000..bdabc6e
--- /dev/null
@@ -0,0 +1,123 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-simulators
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.simulators;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BlockingQueue;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/events")
+public class DMaaPSimulatorJaxRs {
+       
+       private static final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>();
+       private static final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class);
+       private static int responseCode = 200;
+       
+       @GET
+       @Path("/{topicName}/{consumeGroup}/{consumerId}")
+       public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName") String topicName,
+               @Context final HttpServletResponse httpResponse) {
+           int currentRespCode = responseCode;
+        httpResponse.setStatus(currentRespCode);
+        try {
+            httpResponse.flushBuffer();
+        } catch (IOException e) {
+            final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class);
+            logger.error("flushBuffer threw: ", e);
+            return "Got an error";
+        }
+        
+           if (currentRespCode < 200 || currentRespCode >= 300)
+           {
+               return "You got response code: " + currentRespCode;
+           }
+               if (queues.containsKey(topicName)) {
+                       BlockingQueue<String> queue = queues.get(topicName);
+                       String response = "No Data";
+                       try {
+                               response = queue.poll(timeout, TimeUnit.MILLISECONDS);
+                       } catch (InterruptedException e) {
+                               logger.debug("error in DMaaP simulator", e);
+                       }
+                       if (response == null) {
+                           response = "No Data";
+                       }
+                       return response;
+               }
+               else if (timeout > 0) {
+                       try {
+                               Thread.sleep(timeout);
+                               if (queues.containsKey(topicName)) {
+                                       BlockingQueue<String> queue = queues.get(topicName);
+                                       String response = queue.poll();
+                                       if (response == null) {
+                                           response = "No Data";
+                                       }
+                                       return response;
+                               }
+                       } catch (InterruptedException e) {
+                               logger.debug("error in DMaaP simulator", e);
+                       }
+               }
+               return "No topic";
+       }
+       
+       @POST
+       @Path("/{topicName}")
+       @Consumes(MediaType.TEXT_PLAIN)
+       public String publish(@PathParam("topicName") String topicName, String body) { 
+               if (queues.containsKey(topicName)) {
+                       BlockingQueue<String> queue = queues.get(topicName);
+                       queue.offer(body);
+               }
+               else {
+                       BlockingQueue<String> queue = new LinkedBlockingQueue<>();
+                       queue.offer(body);
+                       queues.put(topicName, queue);
+               }
+               
+               return "";
+       }
+       
+       @POST
+       @Path("/setStatus")
+       public String setStatus(@QueryParam("statusCode") int statusCode) {
+           responseCode = statusCode;
+           return "Status code set";
+       }
+}
diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java
new file mode 100644 (file)
index 0000000..415c520
--- /dev/null
@@ -0,0 +1,364 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-simulators
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.simulators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.drools.http.server.HttpServletServer;
+import org.onap.policy.drools.utils.LoggerUtil;
+import org.onap.policy.drools.utils.NetworkUtil;
+
+public class DMaaPSimulatorTest {
+
+       private static final int DMAAPSIM_SERVER_PORT = 6670;
+    @BeforeClass
+    public static void setUpSimulator() {
+        LoggerUtil.setLevel("ROOT", "INFO");
+        LoggerUtil.setLevel("org.eclipse.jetty", "WARN");
+        try {
+               final HttpServletServer testServer = HttpServletServer.factory.build("dmaapSim",
+                               "localhost", DMAAPSIM_SERVER_PORT, "/", false, true);
+               testServer.addServletClass("/*", DMaaPSimulatorJaxRs.class.getName());
+               testServer.waitedStart(5000);
+               if (!NetworkUtil.isTcpPortOpen("localhost", testServer.getPort(), 5, 10000L))
+                       throw new IllegalStateException("cannot connect to port " + testServer.getPort());
+        } catch (final Exception e) {
+               fail(e.getMessage());
+        }
+    }
+
+    @AfterClass
+    public static void tearDownSimulator() {
+        HttpServletServer.factory.destroy();
+    }
+    
+    @Test
+    public void testGetNoData() {
+        int timeout = 1000;
+        Pair <Integer, String> response = dmaapGet("myTopicNoData", timeout);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals("No topic", response.b);
+    }
+    
+    @Test
+    public void testSinglePost() {
+        String myTopic = "myTopicSinglePost";
+        String testData = "This is some test data";
+        Pair<Integer, String> response = dmaapPost(myTopic, testData);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapGet(myTopic, 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(testData, response.b);
+    }
+    
+    @Test
+    public void testOneTopicMultiPost() {
+        String[] data = {"data point 1", "data point 2", "something random"};
+        String myTopic = "myTopicMultiPost";
+        Pair<Integer, String> response = dmaapPost(myTopic, data[0]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapPost(myTopic, data[1]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapPost(myTopic, data[2]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapGet(myTopic, 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[0], response.b);
+        
+        response = dmaapGet(myTopic, 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[1], response.b);
+        
+        response = dmaapGet(myTopic, 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[2], response.b);
+    }
+    
+    @Test
+    public void testMultiTopic() {
+        String[][] data = {{"Topic one message one", "Topic one message two"}, {"Topic two message one", "Topic two message two"}};
+        String[] topics = {"topic1", "topic2"};
+        
+        Pair<Integer, String> response = dmaapPost(topics[0], data[0][0]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapGet(topics[0], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[0][0], response.b);
+        
+        response = dmaapGet(topics[1], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals("No topic", response.b);
+        
+        response = dmaapPost(topics[1], data[1][0]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapPost(topics[1], data[1][1]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapPost(topics[0], data[0][1]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapGet(topics[1], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[1][0], response.b);
+        
+        response = dmaapGet(topics[0], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[0][1], response.b);
+        
+        response = dmaapGet(topics[1], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[1][1], response.b);
+        
+        response = dmaapGet(topics[0], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals("No Data", response.b);
+    }
+    
+    @Test
+    public void testResponseCode() {
+        Pair<Integer, String> response = dmaapPost("myTopic", "myTopicData");
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = setStatus(503);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapGet("myTopic", 500);
+        assertNotNull(response);
+        assertEquals(503, response.a.intValue());
+        assertEquals("You got response code: 503", response.b);
+        
+        response = setStatus(202);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        
+        response = dmaapGet("myTopic", 500);
+        assertNotNull(response);
+        assertEquals(202, response.a.intValue());
+        assertEquals("myTopicData", response.b);
+    }
+    
+    private static Pair<Integer, String> dmaapGet (String topic, int timeout) {
+        return dmaapGet(topic, "1", "1", timeout);
+    }
+    
+    private static Pair<Integer, String> dmaapGet (String topic, String consumerGroup, String consumerId, int timeout) {
+        String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/" + topic + "/" + consumerGroup + "/" + consumerId + "?timeout=" + timeout;
+        try {
+               URLConnection conn = new URL(url).openConnection();
+            HttpURLConnection httpConn = null;
+            if (conn instanceof HttpURLConnection) {
+               httpConn = (HttpURLConnection) conn;
+            }
+            else {
+               fail("connection not set up right");
+            }
+            httpConn.setRequestMethod("GET");
+            httpConn.connect();
+            String response = "";
+            try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+               String line;
+               while((line = connReader.readLine()) != null) {
+                       response += line;
+               }
+               httpConn.disconnect();
+               return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            }
+            catch (IOException e) {
+               if (e.getMessage().startsWith("Server returned HTTP response code")) {
+                       System.out.println("hi");
+                       BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+                       String line;
+                       while((line = connReader.readLine()) != null) {
+                               response += line;
+                       }
+                       httpConn.disconnect();
+                       return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+               }
+               else {
+                       fail("we got an exception: " + e);
+               }
+            }
+        }
+        catch (Exception e) {
+               fail("we got an exception" + e);
+        }
+        
+        return null;
+    }
+    
+    private static Pair<Integer, String> dmaapPost (String topic, String data) {
+        String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/" + topic;
+        byte[] postData = data.getBytes(StandardCharsets.UTF_8);
+        try {
+               URLConnection conn = new URL(url).openConnection();
+               HttpURLConnection httpConn = null;
+            if (conn instanceof HttpURLConnection) {
+               httpConn = (HttpURLConnection) conn;
+            }
+            else {
+               fail("connection not set up right");
+            }
+            httpConn.setRequestMethod("POST");
+            httpConn.setDoOutput(true);
+            httpConn.setRequestProperty( "Content-Type", "text/plain");
+            httpConn.setRequestProperty("Content-Length", ""+postData.length);
+            httpConn.connect();
+            String response = "";
+            try (DataOutputStream connWriter = new DataOutputStream(httpConn.getOutputStream())) {
+               connWriter.write(postData);
+               connWriter.flush();
+            }
+            try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+               String line;
+               while((line = connReader.readLine()) != null) {
+                       response += line;
+               }
+               httpConn.disconnect();
+               return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            }
+            catch (IOException e) {
+               if (e.getMessage().startsWith("Server returned HTTP response code")) {
+                       System.out.println("hi");
+                       BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+                       String line;
+                       while((line = connReader.readLine()) != null) {
+                               response += line;
+                       }
+                       httpConn.disconnect();
+                       return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+               }
+               else {
+                       fail("we got an exception: " + e);
+               }
+            }
+        }
+        catch (Exception e) {
+               fail("we got an exception: " + e);
+        }
+        return null;
+    }
+    
+    private static Pair<Integer, String> setStatus (int status) {
+        String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/setStatus?statusCode=" + status;
+        try {
+               URLConnection conn = new URL(url).openConnection();
+            HttpURLConnection httpConn = null;
+            if (conn instanceof HttpURLConnection) {
+               httpConn = (HttpURLConnection) conn;
+            }
+            else {
+               fail("connection not set up right");
+            }
+            httpConn.setRequestMethod("POST");
+            httpConn.connect();
+            String response = "";
+            try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+               String line;
+               while((line = connReader.readLine()) != null) {
+                       response += line;
+               }
+               httpConn.disconnect();
+               return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            }
+            catch (IOException e) {
+               if (e.getMessage().startsWith("Server returned HTTP response code")) {
+                       System.out.println("hi");
+                       BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+                       String line;
+                       while((line = connReader.readLine()) != null) {
+                               response += line;
+                       }
+                       httpConn.disconnect();
+                       return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+               }
+               else {
+                       fail("we got an exception: " + e);
+               }
+            }
+        }
+        catch (Exception e) {
+               fail("we got an exception" + e);
+        }
+        return null;
+    }
+    
+    private static class Pair<A, B> {
+               public final A a;
+               public final B b;
+               
+               public Pair(A a, B b) {
+                       this.a = a;
+                       this.b = b;
+               }
+       }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8e25b61..44ae9b8 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
                <module>feature-state-management</module>
                <module>api-active-standby-management</module>
                <module>feature-active-standby-management</module>
+               <module>feature-simulators</module>
                <module>packages</module>
        </modules>