2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.testsuites.integration.uservice.adapt.jms;
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28 import javax.jms.MessageProducer;
29 import javax.jms.Session;
30 import javax.jms.Topic;
32 import org.apache.activemq.command.ActiveMQTopic;
33 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
34 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
35 import org.onap.policy.apex.testsuites.integration.common.testclasses.PingTestClass;
36 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * The Class JmsEventProducer.
43 * @author Liam Fallon (liam.fallon@ericsson.com)
45 public class JmsEventProducer implements Runnable {
46 private static final Logger LOGGER = LoggerFactory.getLogger(JmsEventProducer.class);
48 private final String topic;
49 private final int eventCount;
50 private final boolean sendObjects;
51 private final long eventInterval;
52 private long eventsSentCount = 0;
54 private final Thread producerThread;
55 private boolean sendEventsFlag = false;
56 private boolean stopFlag = false;
57 private final Connection connection;
60 * Instantiates a new jms event producer.
62 * @param topic the topic
63 * @param connectionFactory the connection factory
64 * @param username the username
65 * @param password the password
66 * @param eventCount the event count
67 * @param sendObjects the send objects
68 * @param eventInterval the event interval
69 * @throws JMSException the JMS exception
71 public JmsEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
72 final String password, final int eventCount, final boolean sendObjects,
73 final long eventInterval) throws JMSException {
75 this.eventCount = eventCount;
76 this.sendObjects = sendObjects;
77 this.eventInterval = eventInterval;
78 connection = connectionFactory.createConnection(username, password);
81 producerThread = new Thread(this);
82 producerThread.start();
90 final Topic jmsTopic = new ActiveMQTopic(topic);
91 try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
92 final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic)) {
94 while (producerThread.isAlive() && !stopFlag) {
95 ThreadUtilities.sleep(50);
98 sendEventsToTopic(jmsSession, jmsProducer);
99 sendEventsFlag = false;
103 } catch (final Exception e) {
104 throw new ApexEventRuntimeException("JMS event consumption failed", e);
111 public void sendEvents() {
112 sendEventsFlag = true;
116 * Send events to topic.
118 * @param jmsSession the jms session
119 * @param jmsProducer the jms producer
120 * @throws JMSException the JMS exception
122 private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
124 LOGGER.debug("{} : sending events to JMS server, event count {}", this.getClass().getName(), eventCount);
126 for (int i = 0; i < eventCount; i++) {
127 ThreadUtilities.sleep(eventInterval);
129 Message jmsMessage = null;
131 final PingTestClass pingTestClass = new PingTestClass();
132 pingTestClass.setId(i);
133 jmsMessage = jmsSession.createObjectMessage(pingTestClass);
135 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
137 jmsProducer.send(jmsMessage);
140 LOGGER.debug("{} : completed, number of events sent", this.getClass().getName(), eventsSentCount);
144 * Gets the events sent count.
146 * @return the events sent count
148 public long getEventsSentCount() {
149 return eventsSentCount;
155 public void shutdown() {
156 LOGGER.debug("{} : stopping", this.getClass().getName());
159 while (producerThread.isAlive()) {
160 ThreadUtilities.sleep(10);
162 LOGGER.debug("{} : stopped", this.getClass().getName());