f785ab6f33b41430dbd74a43081cfc378eda32db
[policy/apex-pdp.git] / testsuites / integration / integration-uservice-test / src / test / java / org / onap / policy / apex / testsuites / integration / uservice / adapt / jms / JmsEventProducer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.testsuites.integration.uservice.adapt.jms;
23
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28 import javax.jms.MessageProducer;
29 import javax.jms.Session;
30 import javax.jms.Topic;
31
32 import org.apache.activemq.command.ActiveMQTopic;
33 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
34 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
35 import org.onap.policy.apex.testsuites.integration.common.testclasses.PingTestClass;
36 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * The Class JmsEventProducer.
42  *
43  * @author Liam Fallon (liam.fallon@ericsson.com)
44  */
45 public class JmsEventProducer implements Runnable {
46     private static final Logger LOGGER = LoggerFactory.getLogger(JmsEventProducer.class);
47
48     private final String topic;
49     private final int eventCount;
50     private final boolean sendObjects;
51     private final long eventInterval;
52     private long eventsSentCount = 0;
53
54     private final Thread producerThread;
55     private boolean sendEventsFlag = false;
56     private boolean stopFlag = false;
57     private final Connection connection;
58
59     /**
60      * Instantiates a new jms event producer.
61      *
62      * @param topic the topic
63      * @param connectionFactory the connection factory
64      * @param username the username
65      * @param password the password
66      * @param eventCount the event count
67      * @param sendObjects the send objects
68      * @param eventInterval the event interval
69      * @throws JMSException the JMS exception
70      */
71     public JmsEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
72             final String password, final int eventCount, final boolean sendObjects,
73             final long eventInterval) throws JMSException {
74         this.topic = topic;
75         this.eventCount = eventCount;
76         this.sendObjects = sendObjects;
77         this.eventInterval = eventInterval;
78         connection = connectionFactory.createConnection(username, password);
79         connection.start();
80
81         producerThread = new Thread(this);
82         producerThread.start();
83     }
84
85     /**
86      * {@inheritDoc}.
87      */
88     @Override
89     public void run() {
90         final Topic jmsTopic = new ActiveMQTopic(topic);
91         try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
92                 final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic)) {
93
94             while (producerThread.isAlive() && !stopFlag) {
95                 ThreadUtilities.sleep(50);
96
97                 if (sendEventsFlag) {
98                     sendEventsToTopic(jmsSession, jmsProducer);
99                     sendEventsFlag = false;
100                 }
101             }
102
103         } catch (final Exception e) {
104             throw new ApexEventRuntimeException("JMS event consumption failed", e);
105         }
106     }
107
108     /**
109      * Send events.
110      */
111     public void sendEvents() {
112         sendEventsFlag = true;
113     }
114
115     /**
116      * Send events to topic.
117      *
118      * @param jmsSession the jms session
119      * @param jmsProducer the jms producer
120      * @throws JMSException the JMS exception
121      */
122     private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
123
124         LOGGER.debug("{} : sending events to JMS server, event count {}", this.getClass().getName(), eventCount);
125
126         for (int i = 0; i < eventCount; i++) {
127             ThreadUtilities.sleep(eventInterval);
128
129             Message jmsMessage = null;
130             if (sendObjects) {
131                 final PingTestClass pingTestClass = new PingTestClass();
132                 pingTestClass.setId(i);
133                 jmsMessage = jmsSession.createObjectMessage(pingTestClass);
134             } else {
135                 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
136             }
137             jmsProducer.send(jmsMessage);
138             eventsSentCount++;
139         }
140         LOGGER.debug("{} : completed, number of events sent", this.getClass().getName(), eventsSentCount);
141     }
142
143     /**
144      * Gets the events sent count.
145      *
146      * @return the events sent count
147      */
148     public long getEventsSentCount() {
149         return eventsSentCount;
150     }
151
152     /**
153      * Shutdown.
154      */
155     public void shutdown() {
156         LOGGER.debug("{} : stopping", this.getClass().getName());
157         stopFlag = true;
158
159         while (producerThread.isAlive()) {
160             ThreadUtilities.sleep(10);
161         }
162         LOGGER.debug("{} : stopped", this.getClass().getName());
163     }
164
165 }