Flesh out DMaaP simulator 38/96638/7
authorJim Hahn <jrh3@att.com>
Wed, 31 Jul 2019 13:45:27 +0000 (09:45 -0400)
committerJim Hahn <jrh3@att.com>
Mon, 7 Oct 2019 16:04:25 +0000 (12:04 -0400)
Fleshed out the dmaap simulator with additional media types.
Added more junit tests.
Added buildDmaapSim() to simulators Util.

Change-Id: I39acd3df8e8d0ded21228e56fa1ef919cafc3772
Issue-ID: POLICY-2144
Signed-off-by: Jim Hahn <jrh3@att.com>
41 files changed:
models-interactions/model-simulators/pom.xml
models-interactions/model-simulators/src/main/java/org/onap/policy/simulators/Util.java
models-interactions/model-simulators/src/main/resources/org/onap/policy/simulators/dmaap/DmaapParameters.json [new file with mode: 0644]
models-interactions/model-simulators/src/test/java/org/onap/policy/simulators/DmaapSimulatorTest.java [new file with mode: 0644]
models-interactions/model-simulators/src/test/resources/org/onap/policy/simulators/dmaap/TopicParameters.json [new file with mode: 0644]
models-sim/models-sim-dmaap/pom.xml
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterGroup.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/DmaapSimParameterHandler.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/parameters/RestServerParameters.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProvider.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandler.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestServer.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java [deleted file]
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimActivator.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/DmaapSimCommandLineArguments.java
models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/startstop/Main.java
models-sim/models-sim-dmaap/src/main/resources/etc/DefaultConfig.json
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java [moved from models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/DmaapSimConstants.java with 60% similarity]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json [new file with mode: 0644]
models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json
models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json
models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json
models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json [new file with mode: 0644]

index b0c48eb..7d64200 100644 (file)
       <artifactId>policy-models-decisions</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.onap.policy.models.sim</groupId>
+      <artifactId>policy-models-sim-dmaap</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 </project>
index a1d28ba..6c1a057 100644 (file)
 package org.onap.policy.simulators;
 
 import java.io.IOException;
-
+import java.util.Properties;
 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.onap.policy.common.gson.GsonMessageBodyHandler;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.common.utils.resources.ResourceUtils;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
+import org.onap.policy.models.sim.dmaap.rest.DmaapSimRestServer;
 
 public class Util {
     public static final String AAISIM_SERVER_NAME = "aaiSim";
@@ -40,6 +47,7 @@ public class Util {
     public static final int VFCSIM_SERVER_PORT = 6668;
     public static final int GUARDSIM_SERVER_PORT = 6669;
     public static final int SDNCSIM_SERVER_PORT = 6670;
+    public static final int DMAAPSIM_SERVER_PORT = 3904;
 
     private static final String CANNOT_CONNECT = "cannot connect to port ";
     private static final String LOCALHOST = "localhost";
@@ -139,4 +147,34 @@ public class Util {
         }
         return testServer;
     }
+
+    /**
+     * Build a DMaaP simulator.
+     *
+     * @return the simulator
+     * @throws InterruptedException if a thread is interrupted
+     * @throws IOException if an IO errror occurs
+     * @throws CoderException if the server parameters cannot be loaded
+     */
+    public static HttpServletServer buildDmaapSim() throws InterruptedException, IOException, CoderException {
+        String json = ResourceUtils.getResourceAsString("org/onap/policy/simulators/dmaap/DmaapParameters.json");
+        DmaapSimParameterGroup params = new StandardCoder().decode(json, DmaapSimParameterGroup.class);
+
+        DmaapSimProvider.setInstance(new DmaapSimProvider(params));
+
+        Properties props = DmaapSimRestServer.getServerProperties(params.getRestServerParameters());
+
+        final String svcpfx = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+                        + params.getRestServerParameters().getName();
+        props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
+                        Integer.toString(DMAAPSIM_SERVER_PORT));
+        props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+        HttpServletServer testServer = HttpServletServerFactoryInstance.getServerFactory().build(props).get(0);
+        testServer.waitedStart(5000);
+        if (!NetworkUtil.isTcpPortOpen(LOCALHOST, testServer.getPort(), 50, 1000L)) {
+            throw new IllegalStateException(CANNOT_CONNECT + testServer.getPort());
+        }
+        return testServer;
+    }
 }
diff --git a/models-interactions/model-simulators/src/main/resources/org/onap/policy/simulators/dmaap/DmaapParameters.json b/models-interactions/model-simulators/src/main/resources/org/onap/policy/simulators/dmaap/DmaapParameters.json
new file mode 100644 (file)
index 0000000..b704f6f
--- /dev/null
@@ -0,0 +1,8 @@
+{
+    "name": "DMaapSim",
+    "topicSweepSec": 300,
+    "restServerParameters": {
+        "host": "0.0.0.0",
+        "port": 3904
+    }
+}
diff --git a/models-interactions/model-simulators/src/test/java/org/onap/policy/simulators/DmaapSimulatorTest.java b/models-interactions/model-simulators/src/test/java/org/onap/policy/simulators/DmaapSimulatorTest.java
new file mode 100644 (file)
index 0000000..50e9bad
--- /dev/null
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.simulators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.utils.coder.StandardCoder;
+
+public class DmaapSimulatorTest {
+    private static final int MAX_WAIT_SEC = 2;
+    private static final String TOPIC = "MY-TOPIC";
+
+    /**
+     * Messages from the topic are placed here by the endpoint.
+     */
+    private BlockingQueue<String> queue;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        TopicEndpointManager.getManager().shutdown();
+    }
+
+    /**
+     * Starts the simulator and the topic.
+     *
+     * @throws Exception if an error occurs
+     */
+    @Before
+    public void setUp() throws Exception {
+        assertNotNull(Util.buildDmaapSim());
+
+        String topicJson = new String(Files.readAllBytes(
+                        new File("src/test/resources/org/onap/policy/simulators/dmaap/TopicParameters.json").toPath()),
+                        StandardCharsets.UTF_8);
+        topicJson = topicJson.replace("${port}", String.valueOf(Util.DMAAPSIM_SERVER_PORT));
+
+        TopicParameterGroup topicConfig = new StandardCoder().decode(topicJson, TopicParameterGroup.class);
+
+        TopicEndpointManager.getManager().addTopics(topicConfig);
+        TopicEndpointManager.getManager().start();
+
+        queue = new LinkedBlockingQueue<>();
+    }
+
+    @After
+    public void tearDown() {
+        TopicEndpointManager.getManager().shutdown();
+        HttpServletServerFactoryInstance.getServerFactory().destroy();
+    }
+
+    @Test
+    public void test() throws InterruptedException {
+        TopicEndpointManager.getManager().getDmaapTopicSource(TOPIC)
+                        .register((infra, topic, event) -> queue.add(event));
+
+        DmaapTopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC);
+        sink.send("hello");
+        sink.send("world");
+
+        assertEquals("hello", queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+        assertEquals("world", queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+    }
+}
diff --git a/models-interactions/model-simulators/src/test/resources/org/onap/policy/simulators/dmaap/TopicParameters.json b/models-interactions/model-simulators/src/test/resources/org/onap/policy/simulators/dmaap/TopicParameters.json
new file mode 100644 (file)
index 0000000..ba1f480
--- /dev/null
@@ -0,0 +1,21 @@
+{
+    "topicSources": [
+        {
+            "topic": "MY-TOPIC",
+            "servers": [
+                "localhost:${port}"
+            ],
+            "topicCommInfrastructure": "dmaap",
+            "fetchTimeout": 100
+        }
+    ],
+    "topicSinks": [
+        {
+            "topic": "MY-TOPIC",
+            "servers": [
+                "localhost:${port}"
+            ],
+            "topicCommInfrastructure": "dmaap"
+        }
+    ]
+}
\ No newline at end of file
index c1981e6..e3f0f0b 100644 (file)
@@ -1,6 +1,7 @@
 <!--
   ============LICENSE_START=======================================================
    Copyright (C) 2019 Nordix Foundation.
+   Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
   ================================================================================
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
             <artifactId>gson</artifactId>
             <version>${policy.common.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.onap.policy.common</groupId>
+            <artifactId>utils-test</artifactId>
+            <version>${policy.common.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
index caae287..11da575 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,8 +22,8 @@
 package org.onap.policy.models.sim.dmaap.parameters;
 
 import lombok.Getter;
-
 import org.onap.policy.common.parameters.ParameterGroupImpl;
+import org.onap.policy.common.parameters.annotations.Min;
 import org.onap.policy.common.parameters.annotations.NotBlank;
 import org.onap.policy.common.parameters.annotations.NotNull;
 
@@ -35,6 +36,14 @@ import org.onap.policy.common.parameters.annotations.NotNull;
 public class DmaapSimParameterGroup extends ParameterGroupImpl {
     private RestServerParameters restServerParameters;
 
+    /**
+     * Frequency, in seconds, with which to sweep the topics of idle consumers. On each
+     * sweep cycle, if a consumer group has had no new poll requests since the last sweep
+     * cycle, it is removed.
+     */
+    @Min(1)
+    private long topicSweepSec;
+
     /**
      * Create the DMaaP simulator parameter group.
      *
index 8eb76ec..2520545 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -38,7 +39,7 @@ public class DmaapSimParameterHandler {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimParameterHandler.class);
 
-    private static final Coder CODER = new StandardCoder();
+    private final Coder coder = new StandardCoder();
 
     /**
      * Read the parameters from the parameter file.
@@ -54,7 +55,7 @@ public class DmaapSimParameterHandler {
         try {
             // Read the parameters from JSON
             File file = new File(arguments.getFullConfigurationFilePath());
-            dmaapSimParameterGroup = CODER.decode(file, DmaapSimParameterGroup.class);
+            dmaapSimParameterGroup = coder.decode(file, DmaapSimParameterGroup.class);
         } catch (final CoderException e) {
             final String errorMessage = "error reading parameters from \"" + arguments.getConfigurationFilePath()
                     + "\"\n" + "(" + e.getClass().getSimpleName() + "):" + e.getMessage();
index c7269f6..8fb2cff 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.policy.models.sim.dmaap.parameters;
 
 import lombok.Getter;
+
 import org.onap.policy.common.parameters.ParameterGroupImpl;
 import org.onap.policy.common.parameters.annotations.Min;
 import org.onap.policy.common.parameters.annotations.NotBlank;
 import org.onap.policy.common.parameters.annotations.NotNull;
 
 /**
- * Class to hold all parameters needed for DMaaP simulator rest server.
+ * Class to hold all parameters needed for rest server.
  */
 @NotNull
 @NotBlank
@@ -39,6 +41,6 @@ public class RestServerParameters extends ParameterGroupImpl {
     private int port;
 
     public RestServerParameters() {
-        super("RestServerParameters");
+        super(RestServerParameters.class.getSimpleName());
     }
 }
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupData.java
new file mode 100644 (file)
index 0000000..7371513
--- /dev/null
@@ -0,0 +1,190 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data associated with a consumer group. All consumer instances within a group share the
+ * same data object.
+ */
+public class ConsumerGroupData {
+    private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupData.class);
+
+    /**
+     * Returned when messages can no longer be read from this consumer group object,
+     * because it is being removed from the topic. {@link #UNREADABLE_LIST} must not be
+     * the same list as Collections.emptyList(), thus we wrap it.
+     */
+    public static final List<String> UNREADABLE_LIST = Collections.unmodifiableList(Collections.emptyList());
+
+    /**
+     * Returned when there are no messages read. Collections.emptyList() is already
+     * unmodifiable, thus no need to wrap it.
+     */
+    private static final List<String> EMPTY_LIST = Collections.emptyList();
+
+    /**
+     * This is locked while fields other than {@link #messageQueue} are updated.
+     */
+    private final Object lockit = new Object();
+
+    /**
+     * Number of sweep cycles that have occurred since a consumer has attempted to read
+     * from the queue. This consumer group should be removed once this count exceeds
+     * {@code 1}, provided {@link #nreaders} is zero.
+     */
+    private int nsweeps = 0;
+
+    /**
+     * Number of consumers that are currently attempting to read from the queue. This
+     * consumer group should not be removed as long as this is non-zero.
+     */
+    private int nreaders = 0;
+
+    /**
+     * Message queue.
+     */
+    private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param topicName name of the topic with which this object is associated
+     * @param groupName name of the consumer group with which this object is associated
+     */
+    public ConsumerGroupData(String topicName, String groupName) {
+        logger.info("Topic {}: add consumer group: {}", topicName, groupName);
+    }
+
+    /**
+     * Determines if this consumer group should be removed. This should be invoked once
+     * during each sweep cycle. When this returns {@code true}, this consumer group should
+     * be immediately discarded, as any readers will sit in a spin loop waiting for it to
+     * be discarded.
+     *
+     * @return {@code true} if this consumer group should be removed, {@code false}
+     *         otherwise
+     */
+    public boolean shouldRemove() {
+        synchronized (lockit) {
+            return (nreaders == 0 && ++nsweeps > 1);
+        }
+    }
+
+    /**
+     * Reads messages from the queue, blocking if necessary.
+     *
+     * @param maxRead maximum number of messages to read
+     * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+     * @return a list of messages read from the queue, empty if no messages became
+     *         available before the wait time elapsed, or {@link #UNREADABLE_LIST} if this
+     *         consumer group object is no longer active
+     * @throws InterruptedException if this thread was interrupted while waiting for the
+     *         first message
+     */
+    public List<String> read(int maxRead, long waitMs) throws InterruptedException {
+
+        synchronized (lockit) {
+            if (nsweeps > 1 && nreaders == 0) {
+                // cannot use this consumer group object anymore
+                return UNREADABLE_LIST;
+            }
+
+            ++nreaders;
+        }
+
+        /*
+         * Note: do EVERYTHING inside of the "try" block, so that the "finally" block can
+         * update the reader count.
+         *
+         * Do NOT hold the lockit while we're polling, as poll() may block for a while.
+         */
+        try {
+            // always read at least one message
+            int maxRead2 = Math.max(1, maxRead);
+            long waitMs2 = Math.max(0, waitMs);
+
+            // perform a blocking read of the queue
+            String obj = getNextMessage(waitMs2);
+            if (obj == null) {
+                return EMPTY_LIST;
+            }
+
+            /*
+             * List should hold all messages from the queue PLUS the one we already have.
+             * Note: it's possible for additional messages to be added to the queue while
+             * we're reading from it. In that case, the list will grow as needed.
+             */
+            List<String> lst = new ArrayList<>(Math.min(maxRead2, messageQueue.size() + 1));
+            lst.add(obj);
+
+            // perform NON-blocking read of subsequent messages
+            for (int x = 1; x < maxRead2; ++x) {
+                if ((obj = messageQueue.poll()) == null) {
+                    break;
+                }
+
+                lst.add(obj);
+            }
+
+            return lst;
+
+        } finally {
+            synchronized (lockit) {
+                --nreaders;
+                nsweeps = 0;
+            }
+        }
+    }
+
+    /**
+     * Writes messages to the queue.
+     *
+     * @param messages messages to be written to the queue
+     */
+    public void write(List<String> messages) {
+        messageQueue.addAll(messages);
+    }
+
+    // the following methods may be overridden by junit tests
+
+    /**
+     * Gets the next message from the queue.
+     *
+     * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+     * @return the next message, or {@code null} if no messages became available before
+     *         the wait time elapsed
+     * @throws InterruptedException if this thread was interrupted while waiting for the
+     *         message
+     */
+    protected String getNextMessage(long waitMs) throws InterruptedException {
+        return messageQueue.poll(waitMs, TimeUnit.MILLISECONDS);
+    }
+}
index 9de29cd..d11d1b3 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.models.sim.dmaap.provider;
 
+import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+import javax.ws.rs.core.Response.Status;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,50 +43,70 @@ import org.slf4j.LoggerFactory;
  *
  * @author Liam Fallon (liam.fallon@est.tech)
  */
-public class DmaapSimProvider {
+public class DmaapSimProvider extends ServiceManagerContainer {
     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimProvider.class);
 
-    // Recurring string constants
-    private static final String TOPIC_TAG = "Topic:";
+    @Getter
+    @Setter
+    private static DmaapSimProvider instance;
 
-    // Time for a get to wait before checking of a message has come
-    private static final long DMAAP_SIM_WAIT_TIME = 50;
+    /**
+     * Maps a topic name to its data.
+     */
+    private final Map<String, TopicData> topic2data = new ConcurrentHashMap<>();
 
-    // recurring constants
-    private static final String WITH_TIMEOUT = " with timeout ";
+    /**
+     * Thread used to remove idle consumers from the topics.
+     */
+    private ScheduledExecutorService timerPool;
 
-    // The map of topic messages
-    private static final Map<String, SortedMap<Integer, Object>> topicMessageMap = new LinkedHashMap<>();
 
-    // The map of topic messages
-    private static final Map<String, Map<String, MutablePair<Integer, String>>> consumerGroupsMap =
-            new LinkedHashMap<>();
+    /**
+     * Constructs the object.
+     *
+     * @param params parameters
+     */
+    public DmaapSimProvider(DmaapSimParameterGroup params) {
+        addAction("Topic Sweeper", () -> {
+            timerPool = makeTimerPool();
+            timerPool.scheduleWithFixedDelay(new SweeperTask(), params.getTopicSweepSec(), params.getTopicSweepSec(),
+                            TimeUnit.SECONDS);
+        }, () -> timerPool.shutdown());
+    }
 
     /**
      * Process a DMaaP message.
      *
-     * @param topicName The topic name
+     * @param topicName the topic name
      * @param dmaapMessage the message to process
      * @return a response to the message
      */
+    @SuppressWarnings("unchecked")
     public Response processDmaapMessagePut(final String topicName, final Object dmaapMessage) {
-        LOGGER.debug(TOPIC_TAG + topicName + ", Received DMaaP message: " + dmaapMessage);
-
-        synchronized (topicMessageMap) {
-            SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
-            if (messageMap == null) {
-                messageMap = new TreeMap<>();
-                topicMessageMap.put(topicName, messageMap);
-                LOGGER.debug(TOPIC_TAG + topicName + ", created topic message map");
-            }
+        LOGGER.debug("Topic: {}, Received DMaaP message(s): {}", topicName, dmaapMessage);
 
-            int nextKey = (messageMap.isEmpty() ? 0 : messageMap.lastKey() + 1);
+        List<Object> lst;
 
-            messageMap.put(nextKey, dmaapMessage);
-            LOGGER.debug(TOPIC_TAG + topicName + ", cached DMaaP message " + nextKey + ": " + dmaapMessage);
+        if (dmaapMessage instanceof List) {
+            lst = (List<Object>) dmaapMessage;
+        } else {
+            lst = Collections.singletonList(dmaapMessage);
         }
 
-        return Response.status(Response.Status.OK).entity("{\n    \"serverTimeMs\": 0,\n    \"count\": 1\n}").build();
+        TopicData topic = topic2data.get(topicName);
+
+        /*
+         * Write all messages and return the count. If the topic doesn't exist yet, then
+         * there are no subscribers to receive the messages, thus treat it as if all
+         * messages were published.
+         */
+        int nmessages = (topic != null ? topic.write(lst) : lst.size());
+
+        Map<String, Object> map = new LinkedHashMap<>();
+        map.put("serverTimeMs", 0);
+        map.put("count", nmessages);
+
+        return Response.status(Response.Status.OK).entity(map).build();
     }
 
     /**
@@ -92,102 +115,66 @@ public class DmaapSimProvider {
      * @param topicName The topic to wait on
      * @param consumerGroup the consumer group that is waiting
      * @param consumerId the consumer ID that is waiting
-     * @param timeout the length of time to wait for
+     * @param limit the maximum number of messages to get
+     * @param timeoutMs the length of time to wait for
      * @return the DMaaP message or
      */
     public Response processDmaapMessageGet(final String topicName, final String consumerGroup, final String consumerId,
-            final int timeout) {
+                    final int limit, final long timeoutMs) {
 
-        LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
-                + WITH_TIMEOUT + timeout);
+        LOGGER.debug("Topic: {}, Request for DMaaP message: {}: {} with limit={} timeout={}", topicName, consumerGroup,
+                        consumerId, limit, timeoutMs);
 
-        MutablePair<Integer, String> consumerGroupPair = null;
-
-        synchronized (consumerGroupsMap) {
-            Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
-            if (consumerGroupMap == null) {
-                consumerGroupMap = new LinkedHashMap<>();
-                consumerGroupsMap.put(topicName, consumerGroupMap);
-                LOGGER.trace(
-                        TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
-            }
-
-            consumerGroupPair = consumerGroupMap.get(consumerGroup);
-            if (consumerGroupPair == null) {
-                consumerGroupPair = new MutablePair<>(-1, consumerId);
-                consumerGroupMap.put(consumerGroup, consumerGroupPair);
-                LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
-                        + consumerGroup + ":" + consumerId);
-            }
-        }
+        try {
+            List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
+                            timeoutMs);
 
-        long timeOfTimeout = System.currentTimeMillis() + timeout;
+            if (lst.isEmpty() && timeoutMs > 0) {
+                LOGGER.debug("Topic: {}, Timed out waiting for messages: {}: {}", topicName, consumerGroup, consumerId);
+                return Response.status(Status.REQUEST_TIMEOUT).entity(lst).build();
 
-        do {
-            Object waitingMessages = getWaitingMessages(topicName, consumerGroupPair);
-            if (waitingMessages != null) {
-                LOGGER.debug(TOPIC_TAG + topicName + ", Request for DMaaP message: " + consumerGroup + ":" + consumerId
-                        + WITH_TIMEOUT + timeout + ", returning messages " + waitingMessages);
-                return Response.status(Response.Status.OK).entity(waitingMessages).build();
+            } else {
+                LOGGER.debug("Topic: {}, Retrieved {} messages for: {}: {}", topicName, consumerGroup, lst.size(),
+                                consumerId);
+                return Response.status(Status.OK).entity(lst).build();
             }
 
-            try {
-                TimeUnit.MILLISECONDS.sleep(DMAAP_SIM_WAIT_TIME);
-            } catch (InterruptedException ie) {
-                String errorMessage = "Interrupt on wait on simulation of DMaaP topic " + topicName + " for request ID "
-                        + consumerGroup + ":" + consumerId + WITH_TIMEOUT + timeout;
-                LOGGER.warn(errorMessage, ie);
-                Thread.currentThread().interrupt();
-                throw new DmaapSimRuntimeException(errorMessage, ie);
-            }
+        } catch (InterruptedException e) {
+            LOGGER.warn("Topic: {}, Request for DMaaP message interrupted: {}: {}", topicName, consumerGroup,
+                            consumerId, e);
+            Thread.currentThread().interrupt();
+            return Response.status(Status.GONE).entity(Collections.emptyList()).build();
         }
-        while (timeOfTimeout > System.currentTimeMillis());
-
-        LOGGER.trace(TOPIC_TAG + topicName + ", timed out waiting for messages : " + consumerGroup + ":" + consumerId
-                + WITH_TIMEOUT + timeout);
-        return Response.status(Response.Status.REQUEST_TIMEOUT).build();
     }
 
     /**
-     * Return any messages on this topic with a message number greater than the supplied message number.
-     *
-     * @param topicName the topic name to check
-     * @param consumerGroupPair the pair with the information on the last message retrieved
-     * @return the messages or null if there are none
+     * Task to remove idle consumers from each topic.
      */
-    private Object getWaitingMessages(final String topicName, final MutablePair<Integer, String> consumerGroupPair) {
-        String foundMessageList = "[";
-
-        synchronized (topicMessageMap) {
-            SortedMap<Integer, Object> messageMap = topicMessageMap.get(topicName);
-            if (messageMap == null || messageMap.lastKey() <= consumerGroupPair.getLeft()) {
-                return null;
-            }
+    private class SweeperTask implements Runnable {
+        @Override
+        public void run() {
+            topic2data.values().forEach(TopicData::removeIdleConsumers);
+        }
+    }
 
-            boolean first = true;
-            for (Object dmaapMessage : messageMap.tailMap(consumerGroupPair.getLeft() + 1).values()) {
-                if (first) {
-                    first = false;
-                } else {
-                    foundMessageList += ",";
-                }
-                try {
-                    foundMessageList += new StandardCoder().encode(dmaapMessage);
-                } catch (CoderException ce) {
-                    String errorMessage = "Encoding error on message on DMaaP topic " + topicName;
-                    LOGGER.warn(errorMessage, ce);
-                    return null;
-                }
-            }
-            foundMessageList += ']';
+    // the following methods may be overridden by junit tests
 
-            LOGGER.debug(TOPIC_TAG + topicName + ", returning DMaaP messages from  " + consumerGroupPair.getLeft()
-                    + " to " + messageMap.lastKey());
-            synchronized (consumerGroupsMap) {
-                consumerGroupPair.setLeft(messageMap.lastKey());
-            }
-        }
+    /**
+     * Makes a new timer pool.
+     *
+     * @return a new timer pool
+     */
+    protected ScheduledExecutorService makeTimerPool() {
+        return Executors.newScheduledThreadPool(1);
+    }
 
-        return (foundMessageList.length() < 3 ? null : foundMessageList);
+    /**
+     * Makes a new topic.
+     *
+     * @param topicName topic name
+     * @return a new topic
+     */
+    protected TopicData makeTopicData(String topicName) {
+        return new TopicData(topicName);
     }
 }
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/provider/TopicData.java
new file mode 100644 (file)
index 0000000..2737f45
--- /dev/null
@@ -0,0 +1,201 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data associated with a topic.
+ *
+ * <p/>
+ * Note: for ease of implementation, this adds a topic when a consumer polls it rather
+ * than when a publisher writes to it. This is the opposite of how the real DMaaP works.
+ * As a result, this will never return a topic-not-found message to the consumer.
+ */
+public class TopicData {
+    private static final Logger logger = LoggerFactory.getLogger(TopicData.class);
+
+    /**
+     * Name of the topic with which this data is associated.
+     */
+    private final String topicName;
+
+    /**
+     * Maps a consumer group name to its associated data.
+     */
+    private final Map<String, ConsumerGroupData> group2data = new ConcurrentHashMap<>();
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param topicName name of the topic with which this object is associated
+     */
+    public TopicData(String topicName) {
+        logger.info("Topic {}: added", topicName);
+        this.topicName = topicName;
+    }
+
+    /**
+     * Removes idle consumers from the topic. This is typically called once during each
+     * sweep cycle.
+     */
+    public void removeIdleConsumers() {
+        Iterator<Entry<String, ConsumerGroupData>> iter = group2data.entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<String, ConsumerGroupData> ent = iter.next();
+            if (ent.getValue().shouldRemove()) {
+                /*
+                 * We want the minimum amount of time to elapse between invoking
+                 * shouldRemove() and iter.remove(), thus all other statements (e.g.,
+                 * logging) should be done AFTER iter.remove().
+                 */
+                iter.remove();
+
+                logger.info("Topic {}: removed consumer group: {}", topicName, ent.getKey());
+            }
+        }
+    }
+
+    /**
+     * Reads from a particular consumer group's queue.
+     *
+     * @param consumerGroup name of the consumer group from which to read
+     * @param maxRead maximum number of messages to read
+     * @param waitMs time to wait, in milliseconds, if the queue is currently empty
+     * @return a list of messages read from the queue, empty if no messages became
+     *         available before the wait time elapsed
+     * @throws InterruptedException if this thread was interrupted while waiting for the
+     *         first message
+     */
+    public List<String> read(String consumerGroup, int maxRead, long waitMs) throws InterruptedException {
+        /*
+         * It's possible that this thread may spin several times while waiting for
+         * removeIdleConsumers() to complete its call to iter.remove(), thus we create
+         * this closure once, rather than each time through the loop.
+         */
+        Function<String, ConsumerGroupData> maker = this::makeData;
+
+        // loop until we get a readable list
+        List<String> result;
+
+        // @formatter:off
+
+        do {
+            result = group2data.computeIfAbsent(consumerGroup, maker).read(maxRead, waitMs);
+        }
+        while (result == ConsumerGroupData.UNREADABLE_LIST);
+
+        // @formatter:on
+
+        return result;
+    }
+
+    /**
+     * Writes messages to the queues of every consumer group.
+     *
+     * @param messages messages to be written to the queues
+     * @return the number of messages enqueued
+     */
+    public int write(List<Object> messages) {
+        List<String> list = convertMessagesToStrings(messages);
+
+        /*
+         * We don't care if a consumer group is deleted from the map while we're adding
+         * messages to it, as those messages will simply be ignored (and discarded by the
+         * garbage collector).
+         */
+        for (ConsumerGroupData data : group2data.values()) {
+            data.write(list);
+        }
+
+        return list.size();
+    }
+
+    /**
+     * Converts a list of message objects to a list of message strings. If a message
+     * cannot be converted for some reason, then it is not added to the result list, thus
+     * the result list may be shorted than the original input list.
+     *
+     * @param messages objects to be converted
+     * @return a list of message strings
+     */
+    protected List<String> convertMessagesToStrings(List<Object> messages) {
+        Coder coder = new StandardCoder();
+        List<String> list = new ArrayList<>(messages.size());
+
+        for (Object msg : messages) {
+            String str = convertMessageToString(msg, coder);
+            if (str != null) {
+                list.add(str);
+            }
+        }
+
+        return list;
+    }
+
+    /**
+     * Converts a message object to a message string.
+     *
+     * @param message message to be converted
+     * @param coder used to encode the message as a string
+     * @return the message string, or {@code null} if it cannot be converted
+     */
+    protected String convertMessageToString(Object message, Coder coder) {
+        if (message == null) {
+            return null;
+        }
+
+        if (message instanceof String) {
+            return message.toString();
+        }
+
+        try {
+            return coder.encode(message);
+        } catch (CoderException e) {
+            logger.warn("cannot encode {}", message, e);
+            return null;
+        }
+    }
+
+    // this may be overridden by junit tests
+
+    /**
+     * Makes data for a consumer group.
+     *
+     * @param consumerGroup name of the consumer group to make
+     * @return new consumer group data
+     */
+    protected ConsumerGroupData makeData(String consumerGroup) {
+        return new ConsumerGroupData(topicName, consumerGroup);
+    }
+}
index e269ac0..dbd9422 100644 (file)
-/*
- * ============LICENSE_START======================================================= ONAP
- * ================================================================================ Copyright (C) 2019 AT&T Intellectual
- * Property. All rights reserved. ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  * ============LICENSE_END=========================================================
  */
 
 package org.onap.policy.models.sim.dmaap.rest;
 
 import java.io.BufferedReader;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.Reader;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
 import javax.ws.rs.Consumes;
-import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.ext.MessageBodyReader;
 import javax.ws.rs.ext.Provider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
 
 /**
- * Provider that serializes and de-serializes JSON via gson.
+ * Provider that decodes "application/cambria" messages.
  */
 @Provider
 @Consumes(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA)
-@Produces(CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA)
 public class CambriaMessageBodyHandler implements MessageBodyReader<Object> {
-    // Media type for Cambria
     public static final String MEDIA_TYPE_APPLICATION_CAMBRIA = "application/cambria";
 
-    public static final Logger logger = LoggerFactory.getLogger(CambriaMessageBodyHandler.class);
+    /**
+     * Maximum length of a message or partition.
+     */
+    private static final int MAX_LEN = 10000000;
+
+    /**
+     * Maximum digits in a length field.
+     */
+    private static final int MAX_DIGITS = 10;
 
     @Override
     public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
-        return MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString());
+        return (mediaType != null && MEDIA_TYPE_APPLICATION_CAMBRIA.equals(mediaType.toString()));
     }
 
     @Override
-    public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
-            MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
-            throws IOException {
-
-        String cambriaString = "";
-        try (BufferedReader bufferedReader = new BufferedReader(
-                new InputStreamReader(entityStream))) {
-            String line;
-            while ((line = bufferedReader.readLine()) != null) {
-                cambriaString += line;
+    public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
+                    MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException {
+
+        try (BufferedReader bufferedReader =
+                        new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) {
+            List<Object> messages = new LinkedList<>();
+            String msg;
+            while ((msg = readMessage(bufferedReader)) != null) {
+                messages.add(msg);
+            }
+
+            return messages;
+        }
+    }
+
+    /**
+     * Reads a message.
+     *
+     * @param reader source from which to read
+     * @return the message that was read, or {@code null} if there are no more messages
+     * @throws IOException if an error occurs
+     */
+    private String readMessage(Reader reader) throws IOException {
+        if (!skipWhitespace(reader)) {
+            return null;
+        }
+
+        int partlen = readLength(reader);
+        if (partlen > MAX_LEN) {
+            throw new IOException("invalid partition length");
+        }
+
+        int msglen = readLength(reader);
+        if (msglen > MAX_LEN) {
+            throw new IOException("invalid message length");
+        }
+
+        // skip over the partition
+        reader.skip(partlen);
+
+        return readString(reader, msglen);
+    }
+
+    /**
+     * Skips whitespace.
+     *
+     * @param reader source from which to read
+     * @return {@code true} if there is another character after the whitespace,
+     *         {@code false} if the end of the stream has been reached
+     * @throws IOException if an error occurs
+     */
+    private boolean skipWhitespace(Reader reader) throws IOException {
+        int chr;
+
+        do {
+            reader.mark(1);
+            if ((chr = reader.read()) < 0) {
+                return false;
+            }
+        }
+        while (Character.isWhitespace(chr));
+
+        // push the last character back onto the reader
+        reader.reset();
+
+        return true;
+    }
+
+    /**
+     * Reads a length field, which is a number followed by ".".
+     *
+     * @param reader source from which to read
+     * @return the length, or -1 if EOF has been reached
+     * @throws IOException if an error occurs
+     */
+    private int readLength(Reader reader) throws IOException {
+        StringBuilder bldr = new StringBuilder(MAX_DIGITS);
+
+        int chr;
+        for (int x = 0; x < MAX_DIGITS; ++x) {
+            if ((chr = reader.read()) < 0) {
+                throw new EOFException("missing '.' in 'length' field");
             }
 
-            return cambriaString.substring(cambriaString.indexOf('{'), cambriaString.length());
+            if (chr == '.') {
+                String text = bldr.toString().trim();
+                return (text.isEmpty() ? 0 : Integer.valueOf(text));
+            }
+
+            if (!Character.isDigit(chr)) {
+                throw new IOException("invalid character in 'length' field");
+            }
+
+            bldr.append((char) chr);
         }
+
+        throw new IOException("too many digits in 'length' field");
+    }
+
+    /**
+     * Reads a string.
+     *
+     * @param reader source from which to read
+     * @param len length of the string (i.e., number of characters to read)
+     * @return the string that was read
+     * @throws IOException if an error occurs
+     */
+    private String readString(Reader reader, int len) throws IOException {
+        char[] buf = new char[len];
+        IOUtils.readFully(reader, buf);
+
+        return new String(buf);
     }
 }
index e3fdd48..8339d0e 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.models.sim.dmaap.rest;
 
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Response;
-
 import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
 
 /**
  * Class to provide REST endpoints for DMaaP simulator component statistics.
  */
 @Path("/events")
+@Produces(DmaapSimRestControllerV1.MEDIA_TYPE_APPLICATION_JSON)
 public class DmaapSimRestControllerV1 extends BaseRestControllerV1 {
+    public static final String MEDIA_TYPE_APPLICATION_JSON = "application/json";
 
     /**
      * Get a DMaaP message.
@@ -45,72 +46,33 @@ public class DmaapSimRestControllerV1 extends BaseRestControllerV1 {
      * @param topicName topic to get message from
      * @param consumerGroup consumer group that is getting the message
      * @param consumerId consumer ID that is getting the message
-     * @param timeout timeout for the message
+     * @param timeoutMs timeout for the message
      * @return the message
      */
     @GET
     @Path("{topicName}/{consumerGroup}/{consumerId}")
-    // @formatter:off
-    @ApiOperation(
-            value = "Get a DMaaP event on a topic",
-            notes = "Returns an event on a DMaaP topic",
-            response = Object.class,
-            authorizations =
-                @Authorization(value = AUTHORIZATION_TYPE)
-        )
-    @ApiResponses(
-            value = {
-                @ApiResponse(
-                        code = AUTHENTICATION_ERROR_CODE,
-                        message = AUTHENTICATION_ERROR_MESSAGE),
-                @ApiResponse(
-                        code = AUTHORIZATION_ERROR_CODE,
-                        message = AUTHORIZATION_ERROR_MESSAGE),
-                @ApiResponse(
-                        code = SERVER_ERROR_CODE,
-                        message = SERVER_ERROR_MESSAGE)
-            }
-        )
-    // @formatter:on
-    public Response getDmaaapMessage(@PathParam("topicName") final String topicName,
-            @PathParam("consumerGroup") final String consumerGroup, @PathParam("consumerId") final String consumerId,
-            @QueryParam("timeout") final int timeout) {
+    public Response getDmaapMessage(@PathParam("topicName") final String topicName,
+                    @PathParam("consumerGroup") final String consumerGroup,
+                    @PathParam("consumerId") final String consumerId,
+                    @QueryParam("limit") @DefaultValue("1") final int limit,
+                    @QueryParam("timeout") @DefaultValue("15000") final long timeoutMs) {
 
-        return new DmaapSimProvider().processDmaapMessageGet(topicName, consumerGroup, consumerId, timeout);
+        return DmaapSimProvider.getInstance().processDmaapMessageGet(topicName, consumerGroup, consumerId, limit,
+                        timeoutMs);
     }
 
     /**
      * Post a DMaaP message.
      *
-     * @param topicName topic to get message from415
+     * @param topicName topic to get message from
      * @return the response to the post
      */
     @POST
     @Path("{topicName}")
-    // @formatter:off
-    @ApiOperation(
-            value = "Post a DMaaP event on a topic",
-            notes = "Returns an event on a DMaaP topic",
-            response = Response.class,
-            authorizations =
-                @Authorization(value = AUTHORIZATION_TYPE)
-        )
-    @ApiResponses(
-            value = {
-                @ApiResponse(
-                        code = AUTHENTICATION_ERROR_CODE,
-                        message = AUTHENTICATION_ERROR_MESSAGE),
-                @ApiResponse(
-                        code = AUTHORIZATION_ERROR_CODE,
-                        message = AUTHORIZATION_ERROR_MESSAGE),
-                @ApiResponse(
-                        code = SERVER_ERROR_CODE,
-                        message = SERVER_ERROR_MESSAGE)
-            }
-        )
-    // @formatter:on
-    public Response postDmaaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) {
+    @Consumes(value = {CambriaMessageBodyHandler.MEDIA_TYPE_APPLICATION_CAMBRIA,
+                    TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN, MEDIA_TYPE_APPLICATION_JSON})
+    public Response postDmaapMessage(@PathParam("topicName") final String topicName, final Object dmaapMessage) {
 
-        return new DmaapSimProvider().processDmaapMessagePut(topicName, dmaapMessage);
+        return DmaapSimProvider.getInstance().processDmaapMessagePut(topicName, dmaapMessage);
     }
 }
index 28de42c..b05a0fe 100644 (file)
 
 package org.onap.policy.models.sim.dmaap.rest;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-
-import org.onap.policy.common.capabilities.Startable;
 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.gson.GsonMessageBodyHandler;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
 import org.onap.policy.models.sim.dmaap.parameters.RestServerParameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Class to manage life cycle of DMaaP Simulator rest server.
  */
-public class DmaapSimRestServer implements Startable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(DmaapSimRestServer.class);
+public class DmaapSimRestServer extends ServiceManagerContainer {
 
-    private List<HttpServletServer> servers = new ArrayList<>();
-
-    private RestServerParameters restServerParameters;
+    private final List<HttpServletServer> servers;
 
     /**
      * Constructor for instantiating DmaapSimRestServer.
@@ -50,91 +43,40 @@ public class DmaapSimRestServer implements Startable {
      * @param restServerParameters the rest server parameters
      */
     public DmaapSimRestServer(final RestServerParameters restServerParameters) {
-        this.restServerParameters = restServerParameters;
-    }
+        this.servers = HttpServletServerFactoryInstance.getServerFactory()
+                        .build(getServerProperties(restServerParameters));
 
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public boolean start() {
-        try {
-            servers = HttpServletServerFactoryInstance.getServerFactory().build(getServerProperties());
-            for (final HttpServletServer server : servers) {
-                server.start();
-            }
-        } catch (final Exception exp) {
-            LOGGER.error("Failed to start DMaaP simulator http server", exp);
-            return false;
+        for (HttpServletServer server : this.servers) {
+            addAction("REST " + server.getName(), server::start, server::stop);
         }
-        return true;
     }
 
     /**
-     * Creates the server properties object using restServerParameters.
+     * Creates a set of properties, suitable for building a REST server, from the
+     * parameters.
      *
-     * @return the properties object
+     * @param restServerParameters parameters from which to build the properties
+     * @return a set of properties representing the given parameters
      */
-    private Properties getServerProperties() {
+    public static Properties getServerProperties(RestServerParameters restServerParameters) {
         final Properties props = new Properties();
         props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, restServerParameters.getName());
 
         final String svcpfx =
-                PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName();
+                        PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName();
 
         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, restServerParameters.getHost());
         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
-                Integer.toString(restServerParameters.getPort()));
+                        Integer.toString(restServerParameters.getPort()));
         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
-                String.join(",", DmaapSimRestControllerV1.class.getName()));
+                        DmaapSimRestControllerV1.class.getName());
         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
-        props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "true");
+        props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "false");
+
         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER,
-                CambriaMessageBodyHandler.class.getName() + "," + JsonMessageBodyHandler.class.getName());
+                        String.join(",", CambriaMessageBodyHandler.class.getName(),
+                                        GsonMessageBodyHandler.class.getName(),
+                                        TextMessageBodyHandler.class.getName()));
         return props;
     }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public boolean stop() {
-        for (final HttpServletServer server : servers) {
-            try {
-                server.stop();
-            } catch (final Exception exp) {
-                LOGGER.error("Failed to stop DMaaP simulator http server", exp);
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void shutdown() {
-        stop();
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public boolean isAlive() {
-        return !servers.isEmpty();
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public String toString() {
-        final StringBuilder builder = new StringBuilder();
-        builder.append("DmaapSimRestServer [servers=");
-        builder.append(servers);
-        builder.append("]");
-        return builder.toString();
-    }
-
 }
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/JsonMessageBodyHandler.java
deleted file mode 100644 (file)
index a3eebda..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START======================================================= ONAP
- * ================================================================================ Copyright (C) 2019 AT&T Intellectual
- * Property. All rights reserved. ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.models.sim.dmaap.rest;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Provider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provider that serializes and de-serializes JSON via gson.
- */
-@Provider
-@Consumes(MediaType.APPLICATION_JSON)
-@Produces(MediaType.APPLICATION_JSON)
-public class JsonMessageBodyHandler implements MessageBodyReader<Object> {
-    public static final Logger logger = LoggerFactory.getLogger(JsonMessageBodyHandler.class);
-
-    @Override
-    public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
-        return MediaType.APPLICATION_JSON.equals(mediaType.toString());
-    }
-
-    @Override
-    public String readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
-            MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
-            throws IOException {
-
-        String jsonString = "";
-        try (BufferedReader bufferedReader = new BufferedReader(
-                new InputStreamReader(entityStream))) {
-            String line;
-            while ((line = bufferedReader.readLine()) != null) {
-                jsonString += line;
-            }
-
-            return jsonString;
-        }
-    }
-}
diff --git a/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java b/models-sim/models-sim-dmaap/src/main/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandler.java
new file mode 100644 (file)
index 0000000..3c903c8
--- /dev/null
@@ -0,0 +1,66 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+
+/**
+ * Provider that decodes "text/plain" messages.
+ */
+@Provider
+@Consumes(TextMessageBodyHandler.MEDIA_TYPE_TEXT_PLAIN)
+public class TextMessageBodyHandler implements MessageBodyReader<Object> {
+    public static final String MEDIA_TYPE_TEXT_PLAIN = "text/plain";
+
+    @Override
+    public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
+        return (mediaType != null && MEDIA_TYPE_TEXT_PLAIN.equals(mediaType.toString()));
+    }
+
+    @Override
+    public List<Object> readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType,
+                    MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException {
+
+        try (BufferedReader bufferedReader =
+                        new BufferedReader(new InputStreamReader(entityStream, StandardCharsets.UTF_8))) {
+            List<Object> messages = new LinkedList<>();
+            String msg;
+            while ((msg = bufferedReader.readLine()) != null) {
+                messages.add(msg);
+            }
+
+            return messages;
+        }
+    }
+}
index 899c0e0..b9e0efa 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.models.sim.dmaap.startstop;
 
-import org.onap.policy.common.parameters.ParameterService;
 import org.onap.policy.common.utils.services.ServiceManagerContainer;
 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
 import org.onap.policy.models.sim.dmaap.rest.DmaapSimRestServer;
 
 /**
  * This class activates the DMaaP simulator as a complete service.
  */
 public class DmaapSimActivator extends ServiceManagerContainer {
-    /**
-     * The DMaaP simulator REST API server.
-     */
-    private DmaapSimRestServer restServer;
 
     /**
      * Instantiate the activator for the DMaaP simulator as a complete service.
@@ -42,19 +39,11 @@ public class DmaapSimActivator extends ServiceManagerContainer {
     public DmaapSimActivator(final DmaapSimParameterGroup dmaapSimParameterGroup) {
         super("DMaaP Simulator");
 
-        // @formatter:off
-        addAction("DMaaP Simulator parameters",
-            () -> ParameterService.register(dmaapSimParameterGroup),
-            () -> ParameterService.deregister(dmaapSimParameterGroup.getName()));
-
-        addAction("Create REST server",
-            () -> restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters()),
-            () -> restServer = null
-        );
+        DmaapSimProvider provider = new DmaapSimProvider(dmaapSimParameterGroup);
+        DmaapSimProvider.setInstance(provider);
+        addAction("Sim Provider", provider::start, provider::stop);
 
-        addAction("REST server",
-            () -> restServer.start(),
-            () -> restServer.stop());
-        // @formatter:on
+        DmaapSimRestServer restServer = new DmaapSimRestServer(dmaapSimParameterGroup.getRestServerParameters());
+        addAction("REST server", restServer::start, restServer::stop);
     }
 }
index cf559f7..724c3dc 100644 (file)
@@ -26,13 +26,15 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.URL;
 import java.util.Arrays;
-
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
 import org.onap.policy.common.utils.resources.ResourceUtils;
 import org.onap.policy.models.sim.dmaap.DmaapSimException;
 import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
@@ -46,6 +48,9 @@ public class DmaapSimCommandLineArguments {
     private static final int HELP_LINE_LENGTH = 120;
 
     private final Options options;
+
+    @Getter
+    @Setter
     private String configurationFilePath = null;
 
     /**
@@ -145,7 +150,7 @@ public class DmaapSimCommandLineArguments {
      * @throws DmaapSimException on command argument validation errors
      */
     public void validate() throws DmaapSimException {
-        validateReadableFile("DMaaP simulator configuration", configurationFilePath);
+        validateFileExists("DMaaP simulator configuration", configurationFilePath);
     }
 
     /**
@@ -174,15 +179,6 @@ public class DmaapSimCommandLineArguments {
         return stringWriter.toString();
     }
 
-    /**
-     * Gets the configuration file path.
-     *
-     * @return the configuration file path
-     */
-    public String getConfigurationFilePath() {
-        return configurationFilePath;
-    }
-
     /**
      * Gets the full expanded configuration file path.
      *
@@ -192,16 +188,6 @@ public class DmaapSimCommandLineArguments {
         return ResourceUtils.getFilePath4Resource(getConfigurationFilePath());
     }
 
-    /**
-     * Sets the configuration file path.
-     *
-     * @param configurationFilePath the configuration file path
-     */
-    public void setConfigurationFilePath(final String configurationFilePath) {
-        this.configurationFilePath = configurationFilePath;
-
-    }
-
     /**
      * Check set configuration file path.
      *
@@ -212,14 +198,14 @@ public class DmaapSimCommandLineArguments {
     }
 
     /**
-     * Validate readable file.
+     * Validate file exists.
      *
      * @param fileTag the file tag
      * @param fileName the file name
      * @throws DmaapSimException on the file name passed as a parameter
      */
-    private void validateReadableFile(final String fileTag, final String fileName) throws DmaapSimException {
-        if (fileName == null || fileName.length() == 0) {
+    private void validateFileExists(final String fileTag, final String fileName) throws DmaapSimException {
+        if (StringUtils.isBlank(fileName)) {
             throw new DmaapSimException(fileTag + " file was not specified as an argument");
         }
 
@@ -233,11 +219,5 @@ public class DmaapSimCommandLineArguments {
         if (!theFile.exists()) {
             throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist");
         }
-        if (!theFile.isFile()) {
-            throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is not a normal file");
-        }
-        if (!theFile.canRead()) {
-            throw new DmaapSimException(fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is ureadable");
-        }
     }
 }
index 878d008..7b4f41b 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,9 +22,6 @@
 package org.onap.policy.models.sim.dmaap.startstop;
 
 import java.util.Arrays;
-
-import org.onap.policy.common.utils.services.Registry;
-import org.onap.policy.models.sim.dmaap.DmaapSimConstants;
 import org.onap.policy.models.sim.dmaap.DmaapSimException;
 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
@@ -75,14 +73,12 @@ public class Main {
 
         // Now, create the activator for the DMaaP Simulator service
         activator = new DmaapSimActivator(parameterGroup);
-        Registry.register(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR, activator);
 
         // Start the activator
         try {
             activator.start();
         } catch (final RuntimeException e) {
             LOGGER.error("start of DMaaP simulator service failed, used parameters are {}", Arrays.toString(args), e);
-            Registry.unregister(DmaapSimConstants.REG_DMAAP_SIM_ACTIVATOR);
             return;
         }
 
@@ -110,7 +106,7 @@ public class Main {
         parameterGroup = null;
 
         // clear the DMaaP simulator activator
-        if (activator != null) {
+        if (activator != null && activator.isAlive()) {
             activator.stop();
         }
     }
@@ -128,8 +124,9 @@ public class Main {
         public void run() {
             try {
                 // Shutdown the DMaaP simulator service and wait for everything to stop
-                activator.stop();
-            } catch (final RuntimeException e) {
+                shutdown();
+
+            } catch (final RuntimeException | DmaapSimException e) {
                 LOGGER.warn("error occured during shut down of the DMaaP simulator service", e);
             }
         }
index dd2477a..e936eb0 100644 (file)
@@ -1,5 +1,6 @@
 {
     "name": "DMaapSim",
+    "topicSweepSec": 900,
     "restServerParameters": {
         "host": "0.0.0.0",
         "port": 3904
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java
new file mode 100644 (file)
index 0000000..4e37a5e
--- /dev/null
@@ -0,0 +1,39 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+
+public class DmaapSimXxxExceptionTest {
+
+    @Test
+    public void testDmaapSimException() {
+        assertEquals(3, new ExceptionsTester().test(DmaapSimException.class));
+    }
+
+    @Test
+    public void testDmaapSimRuntimeException() {
+        assertEquals(3, new ExceptionsTester().test(DmaapSimRuntimeException.class));
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java
new file mode 100644 (file)
index 0000000..4513ffb
--- /dev/null
@@ -0,0 +1,305 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConsumerGroupDataTest {
+    private static final int WAIT_MS = 5000;
+    private static final int MIN_WAIT_MS = WAIT_MS / 2;
+    private static final String MY_TOPIC = "my-topic";
+    private static final String MY_CONSUMER = "my-consumer";
+    private static final String MSG1 = "hello";
+    private static final String MSG2 = "there";
+    private static final String MSG3 = "world";
+    private static final int MAX_THREADS = 30;
+
+    private MyData data;
+    private MyReader thread;
+    private List<MyReader> threads;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        data = new MyData();
+        thread = null;
+        threads = new ArrayList<>(MAX_THREADS);
+    }
+
+    /**
+     * Stops any running thread.
+     */
+    @After
+    public void tearDown() {
+        for (MyReader thr : threads) {
+            thr.interrupt();
+        }
+
+        for (MyReader thr : threads) {
+            thr.await();
+        }
+    }
+
+    @Test
+    public void testShouldRemove() throws InterruptedException {
+        assertFalse(data.shouldRemove());
+        assertTrue(data.shouldRemove());
+
+        data = new MyData();
+
+        // start a reader thread and wait for it to poll its queue
+        startReader(0, 10);
+        assertTrue(data.await());
+
+        assertFalse(data.shouldRemove());
+    }
+
+    @Test
+    public void testRead() {
+        data.enqueue(MSG1, MSG2, MSG3, MSG1, MSG2, MSG3);
+
+        // this reader only wants one
+        startReader(1, 1);
+        assertTrue(thread.await());
+        assertEquals("[hello]", thread.result.toString());
+
+        // this reader wants three
+        startReader(3, 1);
+        assertTrue(thread.await());
+        assertEquals("[there, world, hello]", thread.result.toString());
+
+        // this reader wants three, but will only get two
+        startReader(3, 1);
+        assertTrue(thread.await());
+        assertEquals("[there, world]", thread.result.toString());
+    }
+
+    @Test
+    public void testRead_Idle() throws InterruptedException {
+        // force it to idle
+        data.shouldRemove();
+        data.shouldRemove();
+
+        long tbeg = System.currentTimeMillis();
+        assertSame(ConsumerGroupData.UNREADABLE_LIST, data.read(1, WAIT_MS));
+
+        // should not have waited
+        assertTrue(System.currentTimeMillis() < tbeg + MIN_WAIT_MS);
+    }
+
+    @Test
+    public void testRead_NegativeCount() throws InterruptedException {
+        data.enqueue(MSG1, MSG2);
+        startReader(-1, 3);
+        assertTrue(data.await());
+
+        // wait time should be unaffected
+        assertEquals(3L, data.waitMs2);
+
+        assertTrue(thread.await());
+
+        // should only return one message
+        assertEquals("[hello]", thread.result.toString());
+    }
+
+    @Test
+    public void testRead_NegativeWait() throws InterruptedException {
+        data.enqueue(MSG1, MSG2, MSG3);
+        startReader(2, -3);
+        assertTrue(data.await());
+
+        assertEquals(0L, data.waitMs2);
+
+        assertTrue(thread.await());
+
+        // should return two messages, as requested
+        assertEquals("[hello, there]", thread.result.toString());
+    }
+
+    @Test
+    public void testRead_NoMessages() throws InterruptedException {
+        startReader(0, 0);
+        assertTrue(data.await());
+
+        assertTrue(thread.await());
+        assertTrue(thread.result.isEmpty());
+    }
+
+    @Test
+    public void testRead_MultiThreaded() {
+        // queue up a bunch of messages
+        final int expected = MAX_THREADS * 3;
+        for (int x = 0; x < expected; ++x) {
+            data.enqueue(MSG1);
+        }
+
+        for (int x = 0; x < MAX_THREADS; ++x) {
+            startReader(4, 1);
+        }
+
+        int actual = 0;
+        for (MyReader thr : threads) {
+            thr.await();
+            actual += thr.result.size();
+        }
+
+        assertEquals(expected, actual);
+    }
+
+
+    /**
+     * Starts a reader thread.
+     *
+     * @param limit number of messages to read at one time
+     * @param waitMs wait time, in milliseconds
+     */
+    private void startReader(int limit, long waitMs) {
+        thread = new MyReader(limit, waitMs);
+
+        thread.setDaemon(true);
+        thread.start();
+
+        threads.add(thread);
+    }
+
+
+    private class MyData extends ConsumerGroupData {
+
+        /**
+         * Decremented when {@link #getNextMessage(long)} is invoked.
+         */
+        private final CountDownLatch latch = new CountDownLatch(1);
+
+        /**
+         * Messages to be added to the queue when {@link #getNextMessage(long)} is
+         * invoked.
+         */
+        private final List<String> messages = new ArrayList<>();
+
+        /**
+         * Value passed to {@link #getNextMessage(long)}.
+         */
+        private volatile long waitMs2 = -1;
+
+        /**
+         * Constructs the object.
+         */
+        public MyData() {
+            super(MY_TOPIC, MY_CONSUMER);
+        }
+
+        /**
+         * Arranges for messages to be injected into the queue the next time
+         * {@link #getNextMessage(long)} is invoked.
+         *
+         * @param messages the messages to be injected
+         */
+        public void enqueue(String... messages) {
+            this.messages.addAll(Arrays.asList(messages));
+        }
+
+        @Override
+        protected String getNextMessage(long waitMs) throws InterruptedException {
+            waitMs2 = waitMs;
+
+            latch.countDown();
+
+            synchronized (messages) {
+                write(messages);
+                messages.clear();
+            }
+
+            return super.getNextMessage(waitMs);
+        }
+
+        /**
+         * Waits for {@link #getNextMessage(long)} to be invoked.
+         *
+         * @return {@code true} if {@link #getNextMessage(long)} was invoked,
+         *         {@code false} if the timer expired first
+         * @throws InterruptedException if the current thread is interrupted while waiting
+         */
+        public boolean await() throws InterruptedException {
+            return latch.await(WAIT_MS, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Thread that will invoke the consumer group's read() method one time.
+     */
+    private class MyReader extends Thread {
+        private final ConsumerGroupData group = data;
+        private final int limit;
+        private final long waitMs;
+
+        /**
+         * Result returned by the read() method.
+         */
+        private List<String> result = Collections.emptyList();
+
+        public MyReader(int limit, long waitMs) {
+            this.limit = limit;
+            this.waitMs = waitMs;
+        }
+
+        @Override
+        public void run() {
+            try {
+                result = group.read(limit, waitMs);
+
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        /**
+         * Waits for the thread to complete.
+         *
+         * @return {@code true} if the thread completed, {@code false} if the thread is
+         *         still running
+         */
+        public boolean await() {
+            try {
+                this.join(WAIT_MS);
+
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+
+            return !this.isAlive();
+        }
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java
new file mode 100644 (file)
index 0000000..f8c1416
--- /dev/null
@@ -0,0 +1,287 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+
+public class DmaapSimProviderTest {
+    private static final String EXPECTED_EXCEPTION = "expected exception";
+    private static final long SWEEP_SEC = 10L;
+    private static final String TOPIC1 = "topic-A";
+    private static final String TOPIC2 = "topic-B";
+    private static final String CONSUMER1 = "consumer-X";
+    private static final String CONSUMER_ID1 = "id1";
+
+    private MyProvider prov;
+
+    @Mock
+    private DmaapSimParameterGroup params;
+
+    @Mock
+    private ScheduledExecutorService timer;
+
+    @Mock
+    private TopicData data1;
+
+    @Mock
+    private TopicData data2;
+
+    @Captor
+    private ArgumentCaptor<List<Object>> listCaptor;
+
+    /**
+     * Sets up.
+     */
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
+
+        prov = new MyProvider(params);
+    }
+
+    /**
+     * Shuts down the provider, if it's running.
+     */
+    @After
+    public void tearDown() {
+        if (prov.isAlive()) {
+            prov.shutdown();
+        }
+    }
+
+    /**
+     * Verifies that the constructor adds all of the expected actions to the service
+     * manager container.
+     */
+    @Test
+    public void testDmaapSimProvider() {
+        prov.start();
+        verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
+
+        prov.stop();
+        verify(timer).shutdown();
+    }
+
+    @Test
+    public void testProcessDmaapMessagePut_List() throws CoderException {
+        prov = spy(new MyProvider(params));
+
+        when(data1.write(any())).thenReturn(2);
+
+        // force topics to exist
+        prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
+        prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
+
+        List<Object> lst = Arrays.asList("hello", "world");
+        Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
+        assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+        StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
+        assertEquals("2", sco.getString("count"));
+
+        List<Object> lst2 = Arrays.asList("helloB", "worldB");
+        prov.processDmaapMessagePut(TOPIC1, lst2);
+        prov.processDmaapMessagePut(TOPIC2, lst2);
+
+        // should only invoke this once for each topic
+        verify(prov).makeTopicData(TOPIC1);
+        verify(prov).makeTopicData(TOPIC2);
+
+        // should process all writes
+        verify(data1).write(lst);
+        verify(data1).write(lst2);
+
+        verify(data2).write(lst2);
+    }
+
+    @Test
+    public void testProcessDmaapMessagePut_Single() throws CoderException {
+        prov = spy(new MyProvider(params));
+
+        // force topics to exist
+        prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
+        prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
+
+        final String value1 = "abc";
+        Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
+        assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+
+        // ensure that the response can be decoded
+        new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
+
+        final String value2 = "def";
+        prov.processDmaapMessagePut(TOPIC1, value2);
+        prov.processDmaapMessagePut(TOPIC2, value2);
+
+        // should only invoke this once for each topic
+        verify(prov).makeTopicData(TOPIC1);
+        verify(prov).makeTopicData(TOPIC2);
+
+        // should process all writes as singleton lists
+        listCaptor.getAllValues().clear();
+        verify(data1, times(2)).write(listCaptor.capture());
+        assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
+        assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
+
+        listCaptor.getAllValues().clear();
+        verify(data2).write(listCaptor.capture());
+        assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0));
+    }
+
+    @Test
+    public void testProcessDmaapMessageGet() throws InterruptedException {
+        List<String> msgs = Arrays.asList("400", "500");
+        when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
+
+        Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
+        assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+        assertEquals(msgs.toString(), resp.getEntity().toString());
+    }
+
+    @Test
+    public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
+        when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
+
+        Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
+        assertEquals(Status.REQUEST_TIMEOUT.getStatusCode(), resp.getStatus());
+        assertEquals("[]", resp.getEntity().toString());
+    }
+
+    @Test
+    public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
+        BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
+
+        // put in a background thread so it doesn't interrupt the tester thread
+        new Thread(() -> {
+            try {
+                when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
+                respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }).start();
+
+        Response resp = respQueue.poll(3, TimeUnit.SECONDS);
+        assertNotNull(resp);
+
+        assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
+        assertEquals("[]", resp.getEntity().toString());
+    }
+
+    @Test
+    public void testSweepTopicTaskRun() {
+        prov.start();
+        prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
+        prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
+
+        ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+        verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
+
+        captor.getValue().run();
+        verify(data1).removeIdleConsumers();
+        verify(data2).removeIdleConsumers();
+
+        // run it again
+        captor.getValue().run();
+        verify(data1, times(2)).removeIdleConsumers();
+        verify(data2, times(2)).removeIdleConsumers();
+    }
+
+    @Test
+    public void testMakeTimerPool() {
+        // use a real provider so we can test the real makeTimer() method
+        DmaapSimProvider prov2 = new DmaapSimProvider(params);
+        prov2.start();
+        prov2.stop();
+    }
+
+    @Test
+    public void testMakeTopicData() {
+        // use a real provider so we can test the real makeTopicData() method
+        DmaapSimProvider prov2 = new DmaapSimProvider(params);
+        prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
+    }
+
+    @Test
+    public void testGetInstance_testSetInstance() {
+        DmaapSimProvider.setInstance(prov);
+        assertSame(prov, DmaapSimProvider.getInstance());
+
+        DmaapSimProvider.setInstance(null);
+        assertNull(DmaapSimProvider.getInstance());
+    }
+
+
+    public class MyProvider extends DmaapSimProvider {
+
+        public MyProvider(DmaapSimParameterGroup params) {
+            super(params);
+        }
+
+        @Override
+        protected ScheduledExecutorService makeTimerPool() {
+            return timer;
+        }
+
+        @Override
+        protected TopicData makeTopicData(String topicName) {
+            switch (topicName) {
+                case TOPIC1:
+                    return data1;
+                case TOPIC2:
+                    return data2;
+                default:
+                    throw new IllegalArgumentException("unknown topic name: " + topicName);
+            }
+        }
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java
new file mode 100644 (file)
index 0000000..f7e1f5e
--- /dev/null
@@ -0,0 +1,213 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+
+public class TopicDataTest {
+    private static final String EXPECTED_EXCEPTION = "expected exception";
+    private static final String GROUP1 = "group-A";
+    private static final String GROUP2 = "group-B";
+    private static final String GROUP3 = "group-C";
+
+    private TopicData data;
+    private ConsumerGroupData consgrp1;
+    private ConsumerGroupData consgrp2;
+    private ConsumerGroupData consgrp3;
+    private List<ConsumerGroupData> groups;
+
+    /**
+     * Sets up mocks and the initial data object.
+     *
+     * @throws Exception if an error occurs
+     */
+    @Before
+    public void setUp() throws Exception {
+        consgrp1 = mock(ConsumerGroupData.class);
+        consgrp2 = mock(ConsumerGroupData.class);
+        consgrp3 = mock(ConsumerGroupData.class);
+
+        when(consgrp1.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+        when(consgrp2.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+        when(consgrp3.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+
+        groups = new LinkedList<>(Arrays.asList(consgrp1, consgrp2, consgrp3));
+
+        data = new TopicData("my-topic") {
+            @Override
+            protected ConsumerGroupData makeData(String consumerGroup) {
+                return groups.remove(0);
+            }
+        };
+    }
+
+    @Test
+    public void testRemoveIdleConsumers() throws Exception {
+        // force two consumers into the map
+        data.read(GROUP1, 0, 0);
+        data.read(GROUP2, 0, 0);
+        data.read(GROUP3, 0, 0);
+
+        // indicate that one should be removed
+        when(consgrp1.shouldRemove()).thenReturn(true);
+
+        // sweep
+        data.removeIdleConsumers();
+
+        assertEquals("[group-B, group-C]", new TreeSet<>(getGroups().keySet()).toString());
+
+        // indicate that the others should be removed
+        when(consgrp2.shouldRemove()).thenReturn(true);
+        when(consgrp3.shouldRemove()).thenReturn(true);
+
+        // sweep
+        data.removeIdleConsumers();
+
+        assertTrue(getGroups().isEmpty());
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        List<String> lst = Collections.emptyList();
+
+        when(consgrp1.read(anyInt(), anyLong())).thenReturn(ConsumerGroupData.UNREADABLE_LIST)
+                        .thenReturn(ConsumerGroupData.UNREADABLE_LIST).thenReturn(lst);
+
+        assertSame(lst, data.read(GROUP1, 10, 20));
+
+        // should have invoked three times
+        verify(consgrp1, times(3)).read(anyInt(), anyLong());
+
+        // should have used the given values
+        verify(consgrp1, times(3)).read(10, 20);
+
+        // should not have allocated more than one group
+        assertEquals(2, groups.size());
+    }
+
+    @Test
+    public void testRead_MultipleGroups() throws Exception {
+        List<String> lst1 = Collections.emptyList();
+        when(consgrp1.read(anyInt(), anyLong())).thenReturn(lst1);
+
+        List<String> lst2 = Collections.emptyList();
+        when(consgrp2.read(anyInt(), anyLong())).thenReturn(lst2);
+
+        // one from each group
+        assertSame(lst1, data.read(GROUP1, 0, 0));
+        assertSame(lst2, data.read(GROUP2, 0, 0));
+
+        // repeat
+        assertSame(lst1, data.read(GROUP1, 0, 0));
+        assertSame(lst2, data.read(GROUP2, 0, 0));
+
+        // again
+        assertSame(lst1, data.read(GROUP1, 0, 0));
+        assertSame(lst2, data.read(GROUP2, 0, 0));
+
+        // should still have group3 in the list
+        assertEquals(1, groups.size());
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        // no groups yet
+        List<Object> messages = Arrays.asList("hello", "world");
+        data.write(messages);
+
+        // add two groups
+        data.read(GROUP1, 0, 0);
+        data.read(GROUP2, 0, 0);
+
+        data.write(messages);
+
+        // should have been written to both groups
+        List<String> strings = messages.stream().map(Object::toString).collect(Collectors.toList());
+        verify(consgrp1).write(strings);
+        verify(consgrp2).write(strings);
+    }
+
+    @Test
+    public void testConvertMessagesToStrings() {
+        assertEquals("[abc, 200]", data.convertMessagesToStrings(Arrays.asList("abc", null, 200)).toString());
+    }
+
+    @Test
+    public void testConvertMessageToString() throws CoderException {
+        Coder coder = new StandardCoder();
+
+        assertNull(data.convertMessageToString(null, coder));
+        assertEquals("text-msg", data.convertMessageToString("text-msg", coder));
+        assertEquals("100", data.convertMessageToString(100, coder));
+
+        coder = mock(Coder.class);
+        when(coder.encode(any())).thenThrow(new CoderException(EXPECTED_EXCEPTION));
+        assertNull(data.convertMessageToString(new TreeMap<String,Object>(), coder));
+    }
+
+    @Test
+    public void testMakeData() throws Exception {
+        // use real objects instead of mocks
+        TopicData data2 = new TopicData("real-data-topic");
+
+        // force a group into the topic
+        data2.read(GROUP1, 0, 0);
+
+        data2.write(Arrays.asList("abc", "def", "ghi"));
+
+        assertEquals("[abc, def]", data2.read(GROUP1, 2, 0).toString());
+    }
+
+    /**
+     * Gets the consumer group map from the topic data object.
+     *
+     * @return the topic's consumer group map
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String, ConsumerGroupData> getGroups() {
+        return (Map<String, ConsumerGroupData>) Whitebox.getInternalState(data, "group2data");
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java
new file mode 100644 (file)
index 0000000..5cba783
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.UUID;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BaseRestControllerV1Test {
+
+    private BaseRestControllerV1 ctlr;
+    private ResponseBuilder bldr;
+
+    @Before
+    public void setUp() {
+        ctlr = new BaseRestControllerV1();
+        bldr = Response.status(Response.Status.OK);
+    }
+
+    @Test
+    public void testAddVersionControlHeaders() {
+        Response resp = ctlr.addVersionControlHeaders(bldr).build();
+        assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_MINOR_NAME));
+        assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_PATCH_NAME));
+        assertEquals("1.0.0", resp.getHeaderString(BaseRestControllerV1.VERSION_LATEST_NAME));
+    }
+
+    @Test
+    public void testAddLoggingHeaders_Null() {
+        Response resp = ctlr.addLoggingHeaders(bldr, null).build();
+        assertNotNull(resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME));
+    }
+
+    @Test
+    public void testAddLoggingHeaders_NonNull() {
+        UUID uuid = UUID.randomUUID();
+        Response resp = ctlr.addLoggingHeaders(bldr, uuid).build();
+        assertEquals(uuid.toString(), resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME));
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java
new file mode 100644 (file)
index 0000000..5d9186c
--- /dev/null
@@ -0,0 +1,145 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import javax.ws.rs.core.MediaType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CambriaMessageBodyHandlerTest {
+    private static final String STD_INPUT = "1.3.XAbc";
+    private static final String EXPECTED_OUTPUT = "[Abc]";
+
+    private CambriaMessageBodyHandler hdlr;
+
+    @Before
+    public void setUp() {
+        hdlr = new CambriaMessageBodyHandler();
+    }
+
+    @Test
+    public void testIsReadable() {
+        assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("application/cambria")));
+
+        assertFalse(hdlr.isReadable(null, null, null, null));
+        assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("application/other")));
+        assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/cambria")));
+    }
+
+    @Test
+    public void testReadFrom() throws IOException {
+        List<Object> lst = readStream("1.11.AMessageBody", "3.3.123Foo3.3.123Bar", "0.16.You can do that..8.Or that.");
+        assertEquals("[MessageBody, Foo, Bar, You can do that., Or that.]", lst.toString());
+
+        // empty stream
+        lst = readStream();
+        assertEquals("[]", lst.toString());
+    }
+
+    @Test
+    public void testReadMessage_InvalidPartitionLength() {
+        assertThatThrownBy(() -> readStream("100000000.3.")).isInstanceOf(IOException.class)
+                        .hasMessage("invalid partition length");
+    }
+
+    @Test
+    public void testReadMessage_InvalidMessageLength() {
+        assertThatThrownBy(() -> readStream("3.100000000.ABC")).isInstanceOf(IOException.class)
+                        .hasMessage("invalid message length");
+    }
+
+    @Test
+    public void testSkipWhitespace() throws IOException {
+        // no white space
+        assertEquals(EXPECTED_OUTPUT, readStream(STD_INPUT).toString());
+
+        // single white space
+        assertEquals(EXPECTED_OUTPUT, readStream(" " + STD_INPUT).toString());
+
+        // multiple white spaces
+        assertEquals(EXPECTED_OUTPUT, readStream("\n\n\t" + STD_INPUT).toString());
+    }
+
+    @Test
+    public void testReadLength_NoDigits() throws IOException {
+        assertEquals("[]", readStream("..").toString());
+    }
+
+    @Test
+    public void testReadLength_NoDot() {
+        assertThatThrownBy(() -> readStream("3.2")).isInstanceOf(EOFException.class)
+                        .hasMessage("missing '.' in 'length' field");
+    }
+
+    @Test
+    public void testReadLength_NonDigit() {
+        assertThatThrownBy(() -> readStream("3.2x.ABCde")).isInstanceOf(IOException.class)
+                        .hasMessage("invalid character in 'length' field");
+    }
+
+    @Test
+    public void testReadLength_TooManyDigits() {
+        assertThatThrownBy(() -> readStream("3.12345678901234567890.ABCde")).isInstanceOf(IOException.class)
+                        .hasMessage("too many digits in 'length' field");
+    }
+
+    @Test
+    public void testReadString_ZeroLength() throws IOException {
+        assertEquals("[]", readStream("1..X").toString());
+    }
+
+    @Test
+    public void testReadString_TooShort() {
+        assertThatThrownBy(() -> readStream(".5.me")).isInstanceOf(EOFException.class).hasMessageContaining("actual");
+    }
+
+    /**
+     * Reads a stream via the handler.
+     *
+     * @param text lines of text to be read
+     * @return the list of objects that were decoded from the stream
+     * @throws IOException if an error occurs
+     */
+    private List<Object> readStream(String... text) throws IOException {
+        return hdlr.readFrom(null, null, null, null, null, makeStream(text));
+    }
+
+    /**
+     * Creates an input stream from lines of text.
+     *
+     * @param text lines of text
+     * @return an input stream
+     */
+    private InputStream makeStream(String... text) {
+        return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8));
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java
new file mode 100644 (file)
index 0000000..7e30c5a
--- /dev/null
@@ -0,0 +1,181 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.charset.StandardCharsets;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import lombok.Getter;
+import org.glassfish.jersey.client.ClientProperties;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.gson.GsonMessageBodyHandler;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.startstop.Main;
+import org.onap.policy.sim.dmaap.parameters.CommonTestData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common base class for rest server tests.
+ */
+public class CommonRestServer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(CommonRestServer.class);
+
+    public static final String NOT_ALIVE = "not alive";
+    public static final String ALIVE = "alive";
+    public static final String SELF = "self";
+    public static final String NAME = "DMaaP Simulator";
+    public static final String ENDPOINT_PREFIX = "events/";
+
+    @Getter
+    private static int port;
+
+    protected static String httpPrefix;
+
+    private static Main main;
+
+    /**
+     * Allocates a port for the server, writes a config file, and then starts Main.
+     *
+     * @throws Exception if an error occurs
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        port = NetworkUtil.allocPort();
+
+        httpPrefix = "http://localhost:" + port + "/";
+
+        String json = new CommonTestData().getParameterGroupAsString(port);
+        makeConfigFile("src/test/resources/parameters/TestConfigParams.json", json);
+
+        HttpServletServerFactoryInstance.getServerFactory().destroy();
+
+        startMain();
+    }
+
+    /**
+     * Stops Main.
+     */
+    @AfterClass
+    public static void teardownAfterClass() {
+        try {
+            if (main != null) {
+                Main main2 = main;
+                main = null;
+
+                main2.shutdown();
+            }
+
+        } catch (DmaapSimException exp) {
+            LOGGER.error("cannot stop main", exp);
+        }
+    }
+
+    /**
+     * Set up.
+     *
+     * @throws Exception if an error occurs
+     */
+    @Before
+    public void setUp() throws Exception {
+        // restart, if not currently running
+        if (main == null) {
+            startMain();
+        }
+    }
+
+    /**
+     * Makes a parameter configuration file.
+     * @param fileName name of the config file to be created
+     * @param json json to be written to the file
+     *
+     * @throws Exception if an error occurs
+     */
+    protected static void makeConfigFile(String fileName, String json) throws Exception {
+        File file = new File(fileName);
+        file.deleteOnExit();
+
+        try (FileOutputStream output = new FileOutputStream(file)) {
+            output.write(json.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+    /**
+     * Starts the "Main".
+     *
+     * @throws Exception if an error occurs
+     */
+    private static void startMain() throws Exception {
+        Registry.newRegistry();
+
+        // make sure port is available
+        if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) {
+            throw new IllegalStateException("port " + port + " is still in use");
+        }
+
+        final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"};
+
+        main = new Main(simConfigParameters);
+
+        if (!NetworkUtil.isTcpPortOpen("localhost", port, 60, 1000L)) {
+            throw new IllegalStateException("server is not listening on port " + port);
+        }
+    }
+
+    /**
+     * Sends a request to an endpoint.
+     *
+     * @param endpoint the target endpoint
+     * @return a request builder
+     * @throws Exception if an error occurs
+     */
+    protected Invocation.Builder sendRequest(final String endpoint) throws Exception {
+        return sendFqeRequest(httpPrefix + ENDPOINT_PREFIX + endpoint);
+    }
+
+    /**
+     * Sends a request to a fully qualified endpoint.
+     *
+     * @param fullyQualifiedEndpoint the fully qualified target endpoint
+     * @return a request builder
+     */
+    protected Invocation.Builder sendFqeRequest(final String fullyQualifiedEndpoint) {
+        final Client client = ClientBuilder.newBuilder().build();
+
+        client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true");
+        client.register(GsonMessageBodyHandler.class);
+
+        final WebTarget webTarget = client.target(fullyQualifiedEndpoint);
+
+        return webTarget.request(MediaType.APPLICATION_JSON);
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java
new file mode 100644 (file)
index 0000000..7b84d54
--- /dev/null
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+import javax.ws.rs.core.Response;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
+
+public class DmaapSimRestControllerV1Test {
+    private static final int LIMIT = 5;
+    private static final String TOPIC = "my-topic";
+    private static final String TOPIC2 = "my-topic-B";
+    private static final String MESSAGE = "my-message";
+    private static final String MESSAGE2 = "my-message-B";
+    private static final String CONSUMER = "my-consumer";
+    private static final String CONSUMER_ID = "my-id";
+
+    private static Coder coder = new StandardCoder();
+
+    private DmaapSimRestControllerV1 rest;
+
+    /**
+     * Creates the controller.
+     *
+     * @throws CoderException if the parameters cannot be loaded
+     */
+    @Before
+    public void setUp() throws CoderException {
+        DmaapSimParameterGroup params = coder.decode(new File("src/test/resources/parameters/NormalParameters.json"),
+                        DmaapSimParameterGroup.class);
+        DmaapSimProvider.setInstance(new DmaapSimProvider(params));
+        rest = new DmaapSimRestControllerV1();
+    }
+
+    @Test
+    public void test() {
+        Response resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0);
+        assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+        assertEquals("[]", resp.getEntity().toString());
+
+        // add some messages
+        resp = rest.postDmaapMessage(TOPIC, Arrays.asList(MESSAGE, MESSAGE2));
+        assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+        assertEquals(2, getCount(resp));
+
+        resp = rest.postDmaapMessage(TOPIC2, Arrays.asList(MESSAGE, MESSAGE2, MESSAGE));
+        assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+        assertEquals(3, getCount(resp));
+
+        // hadn't registered with topic 2 so nothing expected from there
+        resp = rest.getDmaapMessage(TOPIC2, CONSUMER, CONSUMER_ID, LIMIT, 0);
+        assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+        assertEquals("[]", resp.getEntity().toString());
+
+        // now read from topic 1
+        resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0);
+        assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+        assertEquals("[my-message, my-message-B]", resp.getEntity().toString());
+    }
+
+    private int getCount(Response resp) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> map = (Map<String, Object>) resp.getEntity();
+
+        return (int) map.get("count");
+    }
+
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java
new file mode 100644 (file)
index 0000000..2dfbae9
--- /dev/null
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import javax.ws.rs.core.MediaType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TextMessageBodyHandlerTest {
+    private TextMessageBodyHandler hdlr;
+
+    @Before
+    public void setUp() {
+        hdlr = new TextMessageBodyHandler();
+    }
+
+    @Test
+    public void testIsReadable() {
+        assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("text/plain")));
+
+        assertFalse(hdlr.isReadable(null, null, null, null));
+        assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("text/other")));
+        assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/plain")));
+    }
+
+    @Test
+    public void testReadFrom() throws IOException {
+        List<Object> lst = readStream("hello", "world");
+        assertEquals("[hello, world]", lst.toString());
+
+        // empty stream
+        lst = readStream();
+        assertEquals("[]", lst.toString());
+    }
+
+    /**
+     * Reads a stream via the handler.
+     *
+     * @param text lines of text to be read
+     * @return the list of objects that were decoded from the stream
+     * @throws IOException if an error occurs
+     */
+    private List<Object> readStream(String... text) throws IOException {
+        return hdlr.readFrom(null, null, null, null, null, makeStream(text));
+    }
+
+    /**
+     * Creates an input stream from lines of text.
+     *
+     * @param text lines of text
+     * @return an input stream
+     */
+    private InputStream makeStream(String... text) {
+        return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8));
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
new file mode 100644 (file)
index 0000000..8c35de6
--- /dev/null
@@ -0,0 +1,199 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.e2e;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
+
+/**
+ * This tests the simulator using dmaap endpoints to verify that it works from publisher
+ * to subscriber.
+ */
+public class EndToEndTest extends CommonRestServer {
+    private static final int MAX_WAIT_SEC = 5;
+    private static final String ORIG_TOPIC = "MY-TOPIC";
+    private static final String ORIG_TOPIC2 = "MY-TOPIC-B";
+    private static final int MAX_MSG = 200;
+
+    private static int ntests = 0;
+    private static String topicJson;
+
+    private TopicParameterGroup topicConfig;
+
+    private String topic = "MY-TOPIC";
+    private String topic2 = "MY-TOPIC-B";
+
+    /**
+     * Messages from the topic are placed here by the endpoint.
+     */
+    private BlockingQueue<String> queue;
+
+    /**
+     * Messages from topic 2 are placed here by the endpoint.
+     */
+    private BlockingQueue<String> queue2;
+
+    /**
+     * Starts the rest server.
+     *
+     * @throws Exception if an error occurs
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        TopicEndpointManager.getManager().shutdown();
+
+        CommonRestServer.setUpBeforeClass();
+
+        topicJson = new String(
+                        Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
+                        StandardCharsets.UTF_8);
+        topicJson = topicJson.replace("${port}", String.valueOf(getPort()));
+    }
+
+    /**
+     * Starts the topics.
+     *
+     * @throws CoderException if the parameters cannot be decoded
+     */
+    @Before
+    @Override
+    public void setUp() throws CoderException {
+        queue = new LinkedBlockingQueue<>();
+        queue2 = new LinkedBlockingQueue<>();
+
+        /*
+         * change topic names for each test so any listeners that may still exist will not
+         * grab new messages
+         */
+
+        ++ntests;
+        topic = "my-topic-" + ntests;
+        topic2 = "my-topic-b" + ntests;
+
+        String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic);
+
+        topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
+
+        TopicEndpointManager.getManager().addTopics(topicConfig);
+        TopicEndpointManager.getManager().start();
+    }
+
+    @After
+    public void tearDown() {
+        TopicEndpointManager.getManager().shutdown();
+    }
+
+    @Test
+    public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
+        // register listeners to add events to appropriate queue
+        TopicEndpointManager.getManager().getDmaapTopicSource(topic)
+                        .register((infra, topic, event) -> queue.add(event));
+        TopicEndpointManager.getManager().getDmaapTopicSource(topic2)
+                        .register((infra, topic, event) -> queue2.add(event));
+
+        // publish events
+        TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic);
+        TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2);
+        for (int x = 0; x < MAX_MSG; ++x) {
+            sink.send("hello-" + x);
+            sink2.send("world-" + x);
+        }
+
+        // verify events where received
+        for (int x = 0; x < MAX_MSG; ++x) {
+            assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+            assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void testCambriaFormat() throws Exception {
+        test("testCambriaFormat", "application/cambria",
+            (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
+    }
+
+    @Test
+    public void testJson() throws Exception {
+        test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
+    }
+
+    @Test
+    public void testText() throws Exception {
+        test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
+    }
+
+    /**
+     * Uses a raw URL connection to ensure the server can process messages of the given
+     * media type.
+     *
+     * @param testName name of the test
+     * @param mediaType media type
+     * @param writeMessages function that writes messages to a PrintWriter
+     * @throws Exception if an error occurs
+     */
+    private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
+                    throws Exception {
+        String msg1 = "{'abc':10.0}".replace('\'', '"');
+        String msg2 = "{'def':20.0}".replace('\'', '"');
+
+        TopicEndpointManager.getManager().getDmaapTopicSource(topic)
+                        .register((infra, topic, event) -> queue.add(event));
+
+        TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
+        URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
+
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod("POST");
+        conn.setRequestProperty("Content-type", mediaType);
+        conn.setDoOutput(true);
+        conn.connect();
+
+        try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
+            writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
+        }
+
+        assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
+
+        assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+        assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java
new file mode 100644 (file)
index 0000000..21d9ed6
--- /dev/null
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.parameters;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+
+/**
+ * Class to hold/create all parameters for test cases.
+ */
+public class CommonTestData {
+    public static final String SIM_GROUP_NAME = "DMaapSim";
+
+    private static final Coder coder = new StandardCoder();
+
+    /**
+     * Gets the standard simulator parameters.
+     *
+     * @param port port to be inserted into the parameters
+     * @return the standard simulator parameters
+     */
+    public DmaapSimParameterGroup getParameterGroup(int port) {
+        try {
+            return coder.decode(getParameterGroupAsString(port), DmaapSimParameterGroup.class);
+
+        } catch (CoderException e) {
+            throw new DmaapSimRuntimeException("cannot read simulator parameters", e);
+        }
+    }
+
+    /**
+     * Gets the standard simulator parameters, as a String.
+     *
+     * @param port port to be inserted into the parameters
+     * @return the standard simulator parameters
+     */
+    public String getParameterGroupAsString(int port) {
+
+        try {
+            File file = new File("src/test/resources/parameters/NormalParameters.json");
+            String json = new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
+
+            json = json.replace("6845", String.valueOf(port));
+
+            return json;
+
+        } catch (IOException e) {
+            throw new DmaapSimRuntimeException("cannot read simulator parameters", e);
+        }
+    }
+
+    /**
+     * Nulls out a field within a JSON string. It does it by adding a field with the same
+     * name, having a null value, and then prefixing the original field name with "Xxx",
+     * thus causing the original field and value to be ignored.
+     *
+     * @param json JSON string
+     * @param field field to be nulled out
+     * @return a new JSON string with the field nulled out
+     */
+    public String nullifyField(String json, String field) {
+        return json.replace(field + "\"", field + "\":null, \"" + field + "Xxx\"");
+    }
+}
@@ -1,6 +1,6 @@
-/*
+/*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.policy.models.sim.dmaap;
+package org.onap.policy.sim.dmaap.parameters;
 
-/**
- * Names of various items contained in the Registry.
- */
-public class DmaapSimConstants {
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
 
-    // Registry keys
-    public static final String REG_DMAAP_SIM_ACTIVATOR = "object:activator/dmaap-sim";
+public class DmaapSimParameterGroupTest {
+    private static final String MY_NAME = "my-name";
 
-    private DmaapSimConstants() {
-        super();
+    @Test
+    public void testDmaapSimParameterGroup() {
+        DmaapSimParameterGroup params = new DmaapSimParameterGroup(MY_NAME);
+        assertEquals(MY_NAME, params.getName());
     }
 }
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java
new file mode 100644 (file)
index 0000000..8f053d2
--- /dev/null
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.parameters;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments;
+
+public class DmaapSimParameterHandlerTest {
+
+    private static final String RESOURCE_DIR = "src/test/resources/parameters/";
+
+    private DmaapSimParameterHandler handler;
+
+    @Before
+    public void setUp() {
+        handler = new DmaapSimParameterHandler();
+    }
+
+    @Test
+    public void testGetParameters() throws DmaapSimException {
+        final DmaapSimCommandLineArguments args = new DmaapSimCommandLineArguments();
+
+        args.parse(new String[] {"-c", RESOURCE_DIR + "NormalParameters.json"});
+        DmaapSimParameterGroup params = handler.getParameters(args);
+        assertNotNull(params);
+        assertEquals("DMaapSim", params.getName());
+        assertEquals(300L, params.getTopicSweepSec());
+        assertEquals(6845, params.getRestServerParameters().getPort());
+
+
+        args.parse(new String[] {"-c", "FileNotFound.json"});
+        assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+                        .hasMessageStartingWith("error reading parameters");
+
+
+        args.parse(new String[] {"-c", RESOURCE_DIR + "EmptyParameterFile.json"});
+        assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+                        .hasMessageStartingWith("no parameters found");
+
+
+        args.parse(new String[] {"-c", RESOURCE_DIR + "Parameters_InvalidName.json"});
+        assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+                        .hasMessageContaining("validation error");
+    }
+
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java
new file mode 100644 (file)
index 0000000..380a724
--- /dev/null
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.startstop;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimActivator;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments;
+
+
+/**
+ * Class to perform unit test of {@link DmaapSimActivator}}.
+ */
+public class DmaapSimActivatorTest {
+
+    private DmaapSimActivator activator;
+
+    /**
+     * Initializes an activator.
+     *
+     * @throws Exception if an error occurs
+     */
+    @Before
+    public void setUp() throws Exception {
+        Registry.newRegistry();
+        HttpServletServerFactoryInstance.getServerFactory().destroy();
+
+        final String[] papConfigParameters = {"-c", "parameters/NormalParameters.json"};
+        final DmaapSimCommandLineArguments arguments = new DmaapSimCommandLineArguments(papConfigParameters);
+        final DmaapSimParameterGroup parGroup = new DmaapSimParameterHandler().getParameters(arguments);
+
+        activator = new DmaapSimActivator(parGroup);
+    }
+
+    /**
+     * Method for cleanup after each test.
+     *
+     * @throws Exception if an error occurs
+     */
+    @After
+    public void teardown() throws Exception {
+        if (activator != null && activator.isAlive()) {
+            activator.stop();
+        }
+    }
+
+    @Test
+    public void testDmaapSimActivator() {
+        assertFalse(activator.isAlive());
+        activator.start();
+        assertTrue(activator.isAlive());
+
+        // repeat - should throw an exception
+        assertThatIllegalStateException().isThrownBy(() -> activator.start());
+        assertTrue(activator.isAlive());
+    }
+
+    @Test
+    public void testTerminate() {
+        activator.start();
+        activator.stop();
+        assertFalse(activator.isAlive());
+
+        // repeat - should throw an exception
+        assertThatIllegalStateException().isThrownBy(() -> activator.stop());
+        assertFalse(activator.isAlive());
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java
new file mode 100644 (file)
index 0000000..b8e285a
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.startstop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.startstop.Main;
+import org.onap.policy.sim.dmaap.parameters.CommonTestData;
+
+/**
+ * Class to perform unit test of {@link Main}}.
+ *
+ * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
+ */
+public class MainTest {
+    private Main main;
+
+    /**
+     * Set up.
+     */
+    @Before
+    public void setUp() {
+        main = null;
+        HttpServletServerFactoryInstance.getServerFactory().destroy();
+    }
+
+    /**
+     * Shuts "main" down.
+     *
+     * @throws Exception if an error occurs
+     */
+    @After
+    public void tearDown() throws Exception {
+        if (main != null) {
+            main.shutdown();
+        }
+    }
+
+    @Test
+    public void testMain() throws DmaapSimException {
+        final String[] NormalParameters = {"-c", "parameters/NormalParameters.json"};
+        main = new Main(NormalParameters);
+        assertTrue(main.getParameters().isValid());
+        assertEquals(CommonTestData.SIM_GROUP_NAME, main.getParameters().getName());
+
+        main.shutdown();
+    }
+
+    @Test
+    public void testMain_NoArguments() {
+        final String[] NormalParameters = {};
+        main = new Main(NormalParameters);
+        assertTrue(main.getParameters() == null);
+    }
+
+    @Test
+    public void testMain_InvalidArguments() {
+        // note: this is missing the "-c" argument, thus the ARGUMENTS are invalid
+        final String[] NormalParameters = {"parameters/NormalParameters.json"};
+        main = new Main(NormalParameters);
+        assertTrue(main.getParameters() == null);
+    }
+
+    @Test
+    public void testMain_Help() {
+        final String[] NormalParameters = {"-h"};
+        Main.main(NormalParameters);
+    }
+
+    @Test
+    public void testMain_InvalidParameters() {
+        final String[] NormalParameters = {"-c", "parameters/InvalidParameters.json"};
+        main = new Main(NormalParameters);
+        assertTrue(main.getParameters() == null);
+    }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json
new file mode 100644 (file)
index 0000000..e69de29
index a2a0366..deec966 100644 (file)
@@ -1,5 +1,6 @@
 {
     "name": "DMaapSim",
+    "topicSweepSec": 300,
     "restServerParameters": {
         "host": "0.0.0.0",
         "port": 6845
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json
new file mode 100644 (file)
index 0000000..77a320f
--- /dev/null
@@ -0,0 +1,36 @@
+{
+    "topicSources": [
+        {
+            "topic": "MY-TOPIC",
+            "servers": [
+                "localhost:${port}"
+            ],
+            "topicCommInfrastructure": "dmaap",
+            "fetchTimeout": 100
+        },
+        {
+            "topic": "MY-TOPIC-B",
+            "servers": [
+                "localhost:${port}"
+            ],
+            "topicCommInfrastructure": "dmaap",
+            "fetchTimeout": 100
+        }
+    ],
+    "topicSinks": [
+        {
+            "topic": "MY-TOPIC",
+            "servers": [
+                "localhost:${port}"
+            ],
+            "topicCommInfrastructure": "dmaap"
+        },
+        {
+            "topic": "MY-TOPIC-B",
+            "servers": [
+                "localhost:${port}"
+            ],
+            "topicCommInfrastructure": "dmaap"
+        }
+    ]
+}
\ No newline at end of file