6a7e61739195342c9818582c070ca42bec23532d
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.testsuites.integration.uservice.adapt.jms;
22
23 import javax.jms.Connection;
24 import javax.jms.ConnectionFactory;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.Topic;
30
31 import org.apache.activemq.command.ActiveMQTopic;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
34 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * The Class JmsEventProducer.
40  *
41  * @author Liam Fallon (liam.fallon@ericsson.com)
42  */
43 public class JmsEventProducer implements Runnable {
44     private static final Logger LOGGER = LoggerFactory.getLogger(JmsEventProducer.class);
45
46     private final String topic;
47     private final int eventCount;
48     private final boolean sendObjects;
49     private final long eventInterval;
50     private long eventsSentCount = 0;
51
52     private final Thread producerThread;
53     private boolean sendEventsFlag = false;
54     private boolean stopFlag = false;
55     private final Connection connection;
56
57     /**
58      * Instantiates a new jms event producer.
59      *
60      * @param topic the topic
61      * @param connectionFactory the connection factory
62      * @param username the username
63      * @param password the password
64      * @param eventCount the event count
65      * @param sendObjects the send objects
66      * @param eventInterval the event interval
67      * @throws JMSException the JMS exception
68      */
69     public JmsEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
70             final String password, final int eventCount, final boolean sendObjects, final long eventInterval)
71             throws JMSException {
72         this.topic = topic;
73         this.eventCount = eventCount;
74         this.sendObjects = sendObjects;
75         this.eventInterval = eventInterval;
76         connection = connectionFactory.createConnection(username, password);
77         connection.start();
78
79         producerThread = new Thread(this);
80         producerThread.start();
81     }
82
83     /* (non-Javadoc)
84      * @see java.lang.Runnable#run()
85      */
86     @Override
87     public void run() {
88         final Topic jmsTopic = new ActiveMQTopic(topic);
89         try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
90                 final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);) {
91
92             while (producerThread.isAlive() && !stopFlag) {
93                 ThreadUtilities.sleep(50);
94
95                 if (sendEventsFlag) {
96                     sendEventsToTopic(jmsSession, jmsProducer);
97                     sendEventsFlag = false;
98                 }
99             }
100
101         } catch (final Exception e) {
102             throw new ApexEventRuntimeException("JMS event consumption failed", e);
103         }
104     }
105
106     /**
107      * Send events.
108      */
109     public void sendEvents() {
110         sendEventsFlag = true;
111     }
112
113     /**
114      * Send events to topic.
115      *
116      * @param jmsSession the jms session
117      * @param jmsProducer the jms producer
118      * @throws JMSException the JMS exception
119      */
120     private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
121
122         LOGGER.info("{} : sending events to JMS server, event count {}", this.getClass().getCanonicalName(),
123                 eventCount);
124
125         for (int i = 0; i < eventCount; i++) {
126             ThreadUtilities.sleep(eventInterval);
127
128             Message jmsMessage = null;
129             if (sendObjects) {
130                 jmsMessage = jmsSession.createObjectMessage(new TestPing());
131             } else {
132                 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
133             }
134             jmsProducer.send(jmsMessage);
135             eventsSentCount++;
136         }
137         LOGGER.info("{} : completed, number of events sent", this.getClass().getCanonicalName(), eventsSentCount);
138     }
139
140     /**
141      * Gets the events sent count.
142      *
143      * @return the events sent count
144      */
145     public long getEventsSentCount() {
146         return eventsSentCount;
147     }
148
149     /**
150      * Shutdown.
151      */
152     public void shutdown() {
153         LOGGER.info("{} : stopping", this.getClass().getCanonicalName());
154         stopFlag = true;
155
156         while (producerThread.isAlive()) {
157             ThreadUtilities.sleep(10);
158         }
159         LOGGER.info("{} : stopped", this.getClass().getCanonicalName());
160     }
161
162 }