Fix the apex-pdp build issue 51/55651/1
authorramverma <ram.krishna.verma@ericsson.com>
Mon, 2 Jul 2018 16:03:37 +0000 (17:03 +0100)
committerramverma <ram.krishna.verma@ericsson.com>
Mon, 2 Jul 2018 16:04:19 +0000 (17:04 +0100)
Change-Id: I50532314948d16432065dacbadb9d69d8ca49084
Issue-ID: POLICY-865
Signed-off-by: ramverma <ram.krishna.verma@ericsson.com>
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java
testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java
testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java
testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java
testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java

index a965175..878882d 100644 (file)
@@ -196,7 +196,7 @@ public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runn
     @Override
     public void run() {
         // JMS session and message consumer for receiving messages
-        try (Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+        try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
             // Create a message consumer for reception of messages and set this class as a message listener
             createMessageConsumer(jmsSession);
         } catch (final Exception e) {
@@ -209,20 +209,21 @@ public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runn
             LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: "
                     + jmsConsumerProperties.getConsumerTopic());
         }
-        // The endless loop that receives events over JMS
-        while (consumerThread.isAlive() && !stopOrderedFlag) {
-            ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
-        }
     }
 
     /**
      * The helper function to create a message consumer from a given JMS session
-     * 
+     *
      * @param jmsSession a JMS session
      */
-    private void createMessageConsumer(Session jmsSession) {
-        try (MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) {
+    private void createMessageConsumer(final Session jmsSession) {
+        try (final MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) {
             messageConsumer.setMessageListener(this);
+
+            // The endless loop that receives events over JMS
+            while (consumerThread.isAlive() && !stopOrderedFlag) {
+                ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
+            }
         } catch (final Exception e) {
             final String errorMessage = "failed to create a JMS message consumer for receiving messages";
             LOGGER.warn(errorMessage, e);
index e5ddbfe..8b9f2a1 100644 (file)
@@ -5,15 +5,15 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * 
+ *
  * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
@@ -32,11 +32,15 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
 public class JMSEventProducer implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(JMSEventProducer.class);
+
     private final String topic;
     private final int eventCount;
     private final boolean sendObjects;
@@ -48,8 +52,9 @@ public class JMSEventProducer implements Runnable {
     private boolean stopFlag = false;
     private final Connection connection;
 
-    public JMSEventProducer(String topic, ConnectionFactory connectionFactory, String username, String password,
-            final int eventCount, final boolean sendObjects, final long eventInterval) throws JMSException {
+    public JMSEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
+            final String password, final int eventCount, final boolean sendObjects, final long eventInterval)
+            throws JMSException {
         this.topic = topic;
         this.eventCount = eventCount;
         this.sendObjects = sendObjects;
@@ -63,10 +68,9 @@ public class JMSEventProducer implements Runnable {
 
     @Override
     public void run() {
-        try {
-            final Topic jmsTopic = new ActiveMQTopic(topic);
-            final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);
+        final Topic jmsTopic = new ActiveMQTopic(topic);
+        try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);) {
 
             while (producerThread.isAlive() && !stopFlag) {
                 ThreadUtilities.sleep(50);
@@ -77,8 +81,6 @@ public class JMSEventProducer implements Runnable {
                 }
             }
 
-            jmsProducer.close();
-            jmsSession.close();
         } catch (final Exception e) {
             throw new ApexEventRuntimeException("JMS event consumption failed", e);
         }
@@ -89,12 +91,11 @@ public class JMSEventProducer implements Runnable {
     }
 
     private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
-        System.out.println(JMSEventProducer.class.getCanonicalName() + ": sending events to JMS server, event count "
-                + eventCount);
+
+        LOGGER.info("{} : sending events to JMS server, event count {}", this.getClass().getCanonicalName(),
+                eventCount);
 
         for (int i = 0; i < eventCount; i++) {
-            System.out.println(JMSEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
-                    + " milliseconds before sending next event");
             ThreadUtilities.sleep(eventInterval);
 
             Message jmsMessage = null;
@@ -105,9 +106,8 @@ public class JMSEventProducer implements Runnable {
             }
             jmsProducer.send(jmsMessage);
             eventsSentCount++;
-            System.out.println(JMSEventProducer.class.getCanonicalName() + ": sent event " + jmsMessage.toString());
         }
-        System.out.println(JMSEventProducer.class.getCanonicalName() + ": completed");
+        LOGGER.info("{} : completed, number of events sent", this.getClass().getCanonicalName(), eventsSentCount);
     }
 
     public long getEventsSentCount() {
@@ -115,15 +115,13 @@ public class JMSEventProducer implements Runnable {
     }
 
     public void shutdown() {
-        System.out.println(JMSEventProducer.class.getCanonicalName() + ": stopping");
-
+        LOGGER.info("{} : stopping", this.getClass().getCanonicalName());
         stopFlag = true;
 
         while (producerThread.isAlive()) {
             ThreadUtilities.sleep(10);
         }
-
-        System.out.println(JMSEventProducer.class.getCanonicalName() + ": stopped");
+        LOGGER.info("{} : stopped", this.getClass().getCanonicalName());
     }
 
 }
index 46455f5..916216a 100644 (file)
@@ -5,15 +5,15 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * 
+ *
  * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
@@ -34,11 +34,15 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
 public class JMSEventSubscriber implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(JMSEventSubscriber.class);
+
     private final String topic;
     private long eventsReceivedCount = 0;
 
@@ -58,13 +62,9 @@ public class JMSEventSubscriber implements Runnable {
 
     @Override
     public void run() {
-        try {
-            final Topic jmsTopic = new ActiveMQTopic(topic);
-            final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            final MessageConsumer jmsConsumer = jmsSession.createConsumer(jmsTopic);
-
-            System.out.println(JMSEventSubscriber.class.getCanonicalName()
-                    + ": receiving events from Kafka server on topic " + topic);
+        final Topic jmsTopic = new ActiveMQTopic(topic);
+        try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final MessageConsumer jmsConsumer = jmsSession.createConsumer(jmsTopic);) {
 
             while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
                 try {
@@ -75,11 +75,9 @@ public class JMSEventSubscriber implements Runnable {
 
                     if (message instanceof ObjectMessage) {
                         final TestPing testPing = (TestPing) ((ObjectMessage) message).getObject();
-                        System.out.println("Received message: " + testPing.toString());
                         testPing.verify();
                     } else if (message instanceof TextMessage) {
-                        final String textMessage = ((TextMessage) message).getText();
-                        System.out.println("Received message: " + textMessage);
+                        ((TextMessage) message).getText();
                     } else {
                         throw new ApexEventException("unknowm message \"" + message + "\" of type \""
                                 + message.getClass().getCanonicalName() + "\" received");
@@ -90,13 +88,11 @@ public class JMSEventSubscriber implements Runnable {
                 }
             }
 
-            jmsConsumer.close();
-            jmsSession.close();
         } catch (final Exception e) {
             throw new ApexEventRuntimeException("JMS event consumption failed", e);
         }
 
-        System.out.println(JMSEventSubscriber.class.getCanonicalName() + ": event reception completed");
+        LOGGER.info("{} : event reception completed", this.getClass().getCanonicalName());
     }
 
     public long getEventsReceivedCount() {
@@ -111,7 +107,7 @@ public class JMSEventSubscriber implements Runnable {
         }
 
         connection.close();
-        System.out.println(JMSEventSubscriber.class.getCanonicalName() + ": stopped");
+        LOGGER.info("{} : stopped", this.getClass().getCanonicalName());
     }
 
 }
index 36d689b..de3c8d3 100644 (file)
@@ -5,21 +5,27 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * 
+ *
  * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
 
 package org.onap.policy.apex.apps.uservice.test.adapt.jms;
 
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.HOST;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.JMS_TOPIC_APEX_IN;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.JMS_TOPIC_APEX_OUT;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.PORT;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.connectionFactory;
+
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
@@ -48,11 +54,11 @@ public class TestContext implements Context {
             testProperties = new Properties();
 
             final Map<String, Object> params = new HashMap<String, Object>();
-            params.put("host", "localhost");
-            params.put("port", "5445");
-            testProperties.put("ConnectionFactory", TestJMS2JMS.connectionFactory);
-            testProperties.put("jms/topic/apexIn", new ActiveMQTopic("jms/topic/apexIn"));
-            testProperties.put("jms/topic/apexOut", new ActiveMQTopic("jms/topic/apexOut"));
+            params.put("host", HOST);
+            params.put("port", PORT);
+            testProperties.put("ConnectionFactory", connectionFactory);
+            testProperties.put(JMS_TOPIC_APEX_IN, new ActiveMQTopic(JMS_TOPIC_APEX_IN));
+            testProperties.put(JMS_TOPIC_APEX_OUT, new ActiveMQTopic(JMS_TOPIC_APEX_OUT));
         } catch (final Exception e) {
             e.printStackTrace();
             throw new ApexRuntimeException("Context initiation failed", e);
index ca23292..f021b26 100644 (file)
@@ -5,22 +5,22 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * 
+ *
  * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
 
 package org.onap.policy.apex.apps.uservice.test.adapt.jms;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -45,13 +45,17 @@ import org.slf4j.ext.XLogger;
 import org.slf4j.ext.XLoggerFactory;
 
 public class TestJMS2JMS {
+    public static final String PORT = "5445";
+    public static final String HOST = "localhost";
+    public static final String JMS_TOPIC_APEX_IN = "jms/topic/apexIn";
+    public static final String JMS_TOPIC_APEX_OUT = "jms/topic/apexOut";
+
+    private static final int SLEEP_TIME = 1500;
     private static final String GROUP_ROLE = "guests";
     private static final String PACKAGE_NAME = "org.onap.policy.apex.apps.uservice.test.adapt.jms";
     private static final String USERNAME = "guest";
     private static final String PASSWORD = "IAmAGuest";
-    private static final String JMS_TOPIC_APEX_IN = "jms/topic/apexIn";
-    private static final String JMS_TOPIC_APEX_OUT = "jms/topic/apexOut";
-    private static final String URL = "tcp://localhost:5445";
+    private static final String URL = "tcp://" + HOST + ":" + PORT;
 
     private static final String DATA_PARENT_DIR = Paths.get("target", "activemq-data").toString();
 
@@ -70,9 +74,7 @@ public class TestJMS2JMS {
     public static void setupEmbeddedJMSServer() throws Exception {
         final ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
         final BrokerPlugin authenticationPlugin = getAuthenticationBrokerPlugin();
-        if (authenticationPlugin != null) {
-            plugins.add(authenticationPlugin);
-        }
+        plugins.add(authenticationPlugin);
 
         broker = new BrokerService();
         broker.setUseJmx(false);
@@ -108,13 +110,13 @@ public class TestJMS2JMS {
 
     @Test
     public void testJMSObjectEvents() throws ApexException, JMSException {
-        final String[] args = {"src/test/resources/prodcons/JMS2JMSObjectEvent.json"};
+        final String[] args = { "src/test/resources/prodcons/JMS2JMSObjectEvent.json" };
         testJMSEvents(args, true);
     }
 
     @Test
     public void testJMSJsonEvents() throws ApexException, JMSException {
-        final String[] args = {"src/test/resources/prodcons/JMS2JMSJsonEvent.json"};
+        final String[] args = { "src/test/resources/prodcons/JMS2JMSJsonEvent.json" };
         testJMSEvents(args, false);
     }
 
@@ -131,20 +133,22 @@ public class TestJMS2JMS {
 
         final long testStartTime = System.currentTimeMillis();
 
-        while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
-                && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
+        while (isTimedOut(testStartTime) && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
             ThreadUtilities.sleep(EVENT_INTERVAL);
         }
 
-        ThreadUtilities.sleep(1000);
-
-        System.out.println("sent event count: " + producer.getEventsSentCount());
-        System.out.println("received event count: " + subscriber.getEventsReceivedCount());
-        assertTrue(subscriber.getEventsReceivedCount() == producer.getEventsSentCount());
-
+        ThreadUtilities.sleep(SLEEP_TIME);
         apexMain.shutdown();
         subscriber.shutdown();
         producer.shutdown();
-        ThreadUtilities.sleep(1000);
+        ThreadUtilities.sleep(SLEEP_TIME);
+
+        assertEquals(EVENT_COUNT, producer.getEventsSentCount());
+        assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
+
+    }
+
+    private boolean isTimedOut(final long testStartTime) {
+        return System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH;
     }
 }