2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020, 2023-2024 Nordix Foundation.
5 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.testsuites.integration.uservice.adapt.jms;
25 import jakarta.jms.Connection;
26 import jakarta.jms.ConnectionFactory;
27 import jakarta.jms.JMSException;
28 import jakarta.jms.Message;
29 import jakarta.jms.MessageProducer;
30 import jakarta.jms.Session;
31 import jakarta.jms.Topic;
33 import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
36 import org.onap.policy.apex.testsuites.integration.common.testclasses.PingTestClass;
37 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * The Class JmsEventProducer.
44 * @author Liam Fallon (liam.fallon@ericsson.com)
46 public class JmsEventProducer implements Runnable {
47 private static final Logger LOGGER = LoggerFactory.getLogger(JmsEventProducer.class);
49 private final String topic;
50 private final int eventCount;
51 private final boolean sendObjects;
52 private final long eventInterval;
54 private long eventsSentCount = 0;
56 private final Thread producerThread;
57 private boolean sendEventsFlag = false;
58 private boolean stopFlag = false;
59 private final Connection connection;
62 * Instantiates a new jms event producer.
64 * @param topic the topic
65 * @param connectionFactory the connection factory
66 * @param username the username
67 * @param password the password
68 * @param eventCount the event count
69 * @param sendObjects the send objects
70 * @param eventInterval the event interval
71 * @throws JMSException the JMS exception
73 public JmsEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
74 final String password, final int eventCount, final boolean sendObjects,
75 final long eventInterval) throws JMSException {
77 this.eventCount = eventCount;
78 this.sendObjects = sendObjects;
79 this.eventInterval = eventInterval;
80 connection = connectionFactory.createConnection(username, password);
83 producerThread = new Thread(this);
84 producerThread.start();
92 final Topic jmsTopic = new ActiveMQTopic(topic);
93 try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
94 final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic)) {
96 while (producerThread.isAlive() && !stopFlag) {
97 ThreadUtilities.sleep(50);
100 sendEventsToTopic(jmsSession, jmsProducer);
101 sendEventsFlag = false;
105 } catch (final Exception e) {
106 throw new ApexEventRuntimeException("JMS event consumption failed", e);
113 public void sendEvents() {
114 sendEventsFlag = true;
118 * Send events to topic.
120 * @param jmsSession the jms session
121 * @param jmsProducer the jms producer
122 * @throws JMSException the JMS exception
124 private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
126 LOGGER.debug("{} : sending events to JMS server, event count {}", this.getClass().getName(), eventCount);
128 for (int i = 0; i < eventCount; i++) {
129 ThreadUtilities.sleep(eventInterval);
133 final PingTestClass pingTestClass = new PingTestClass();
134 pingTestClass.setId(i);
135 jmsMessage = jmsSession.createObjectMessage(pingTestClass);
137 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
139 jmsProducer.send(jmsMessage);
142 LOGGER.debug("{} : completed, number of events sent: {}", this.getClass().getName(), eventsSentCount);
148 public void shutdown() {
149 LOGGER.debug("{} : stopping", this.getClass().getName());
152 while (producerThread.isAlive()) {
153 ThreadUtilities.sleep(10);
155 LOGGER.debug("{} : stopped", this.getClass().getName());