2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.testsuites.integration.uservice.adapt.jms;
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28 import javax.jms.MessageProducer;
29 import javax.jms.Session;
30 import javax.jms.Topic;
31 import org.apache.activemq.command.ActiveMQTopic;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
34 import org.onap.policy.apex.testsuites.integration.common.testclasses.PingTestClass;
35 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * The Class JmsEventProducer.
42 * @author Liam Fallon (liam.fallon@ericsson.com)
44 public class JmsEventProducer implements Runnable {
45 private static final Logger LOGGER = LoggerFactory.getLogger(JmsEventProducer.class);
47 private final String topic;
48 private final int eventCount;
49 private final boolean sendObjects;
50 private final long eventInterval;
51 private long eventsSentCount = 0;
53 private final Thread producerThread;
54 private boolean sendEventsFlag = false;
55 private boolean stopFlag = false;
56 private final Connection connection;
59 * Instantiates a new jms event producer.
61 * @param topic the topic
62 * @param connectionFactory the connection factory
63 * @param username the username
64 * @param password the password
65 * @param eventCount the event count
66 * @param sendObjects the send objects
67 * @param eventInterval the event interval
68 * @throws JMSException the JMS exception
70 public JmsEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
71 final String password, final int eventCount, final boolean sendObjects,
72 final long eventInterval) throws JMSException {
74 this.eventCount = eventCount;
75 this.sendObjects = sendObjects;
76 this.eventInterval = eventInterval;
77 connection = connectionFactory.createConnection(username, password);
80 producerThread = new Thread(this);
81 producerThread.start();
89 final Topic jmsTopic = new ActiveMQTopic(topic);
90 try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
91 final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic)) {
93 while (producerThread.isAlive() && !stopFlag) {
94 ThreadUtilities.sleep(50);
97 sendEventsToTopic(jmsSession, jmsProducer);
98 sendEventsFlag = false;
102 } catch (final Exception e) {
103 throw new ApexEventRuntimeException("JMS event consumption failed", e);
110 public void sendEvents() {
111 sendEventsFlag = true;
115 * Send events to topic.
117 * @param jmsSession the jms session
118 * @param jmsProducer the jms producer
119 * @throws JMSException the JMS exception
121 private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
123 LOGGER.debug("{} : sending events to JMS server, event count {}", this.getClass().getName(), eventCount);
125 for (int i = 0; i < eventCount; i++) {
126 ThreadUtilities.sleep(eventInterval);
128 Message jmsMessage = null;
130 final PingTestClass pingTestClass = new PingTestClass();
131 pingTestClass.setId(i);
132 jmsMessage = jmsSession.createObjectMessage(pingTestClass);
134 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
136 jmsProducer.send(jmsMessage);
139 LOGGER.debug("{} : completed, number of events sent", this.getClass().getName(), eventsSentCount);
143 * Gets the events sent count.
145 * @return the events sent count
147 public long getEventsSentCount() {
148 return eventsSentCount;
154 public void shutdown() {
155 LOGGER.debug("{} : stopping", this.getClass().getName());
158 while (producerThread.isAlive()) {
159 ThreadUtilities.sleep(10);
161 LOGGER.debug("{} : stopped", this.getClass().getName());