2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.testsuites.integration.uservice.adapt.jms;
23 import javax.jms.Connection;
24 import javax.jms.ConnectionFactory;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.Topic;
31 import org.apache.activemq.command.ActiveMQTopic;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
34 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * The Class JmsEventProducer.
41 * @author Liam Fallon (liam.fallon@ericsson.com)
43 public class JmsEventProducer implements Runnable {
44 private static final Logger LOGGER = LoggerFactory.getLogger(JmsEventProducer.class);
46 private final String topic;
47 private final int eventCount;
48 private final boolean sendObjects;
49 private final long eventInterval;
50 private long eventsSentCount = 0;
52 private final Thread producerThread;
53 private boolean sendEventsFlag = false;
54 private boolean stopFlag = false;
55 private final Connection connection;
58 * Instantiates a new jms event producer.
60 * @param topic the topic
61 * @param connectionFactory the connection factory
62 * @param username the username
63 * @param password the password
64 * @param eventCount the event count
65 * @param sendObjects the send objects
66 * @param eventInterval the event interval
67 * @throws JMSException the JMS exception
69 public JmsEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
70 final String password, final int eventCount, final boolean sendObjects, final long eventInterval)
73 this.eventCount = eventCount;
74 this.sendObjects = sendObjects;
75 this.eventInterval = eventInterval;
76 connection = connectionFactory.createConnection(username, password);
79 producerThread = new Thread(this);
80 producerThread.start();
84 * @see java.lang.Runnable#run()
88 final Topic jmsTopic = new ActiveMQTopic(topic);
89 try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
90 final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);) {
92 while (producerThread.isAlive() && !stopFlag) {
93 ThreadUtilities.sleep(50);
96 sendEventsToTopic(jmsSession, jmsProducer);
97 sendEventsFlag = false;
101 } catch (final Exception e) {
102 throw new ApexEventRuntimeException("JMS event consumption failed", e);
109 public void sendEvents() {
110 sendEventsFlag = true;
114 * Send events to topic.
116 * @param jmsSession the jms session
117 * @param jmsProducer the jms producer
118 * @throws JMSException the JMS exception
120 private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
122 LOGGER.info("{} : sending events to JMS server, event count {}", this.getClass().getCanonicalName(),
125 for (int i = 0; i < eventCount; i++) {
126 ThreadUtilities.sleep(eventInterval);
128 Message jmsMessage = null;
130 jmsMessage = jmsSession.createObjectMessage(new TestPing());
132 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
134 jmsProducer.send(jmsMessage);
137 LOGGER.info("{} : completed, number of events sent", this.getClass().getCanonicalName(), eventsSentCount);
141 * Gets the events sent count.
143 * @return the events sent count
145 public long getEventsSentCount() {
146 return eventsSentCount;
152 public void shutdown() {
153 LOGGER.info("{} : stopping", this.getClass().getCanonicalName());
156 while (producerThread.isAlive()) {
157 ThreadUtilities.sleep(10);
159 LOGGER.info("{} : stopped", this.getClass().getCanonicalName());