Use lombok in apex-pdp #4
[policy/apex-pdp.git] / testsuites / integration / integration-uservice-test / src / test / java / org / onap / policy / apex / testsuites / integration / uservice / adapt / jms / JmsEventProducer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.testsuites.integration.uservice.adapt.jms;
24
25 import javax.jms.Connection;
26 import javax.jms.ConnectionFactory;
27 import javax.jms.JMSException;
28 import javax.jms.Message;
29 import javax.jms.MessageProducer;
30 import javax.jms.Session;
31 import javax.jms.Topic;
32 import lombok.Getter;
33 import org.apache.activemq.command.ActiveMQTopic;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
36 import org.onap.policy.apex.testsuites.integration.common.testclasses.PingTestClass;
37 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The Class JmsEventProducer.
43  *
44  * @author Liam Fallon (liam.fallon@ericsson.com)
45  */
46 public class JmsEventProducer implements Runnable {
47     private static final Logger LOGGER = LoggerFactory.getLogger(JmsEventProducer.class);
48
49     private final String topic;
50     private final int eventCount;
51     private final boolean sendObjects;
52     private final long eventInterval;
53     @Getter
54     private long eventsSentCount = 0;
55
56     private final Thread producerThread;
57     private boolean sendEventsFlag = false;
58     private boolean stopFlag = false;
59     private final Connection connection;
60
61     /**
62      * Instantiates a new jms event producer.
63      *
64      * @param topic the topic
65      * @param connectionFactory the connection factory
66      * @param username the username
67      * @param password the password
68      * @param eventCount the event count
69      * @param sendObjects the send objects
70      * @param eventInterval the event interval
71      * @throws JMSException the JMS exception
72      */
73     public JmsEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
74             final String password, final int eventCount, final boolean sendObjects,
75             final long eventInterval) throws JMSException {
76         this.topic = topic;
77         this.eventCount = eventCount;
78         this.sendObjects = sendObjects;
79         this.eventInterval = eventInterval;
80         connection = connectionFactory.createConnection(username, password);
81         connection.start();
82
83         producerThread = new Thread(this);
84         producerThread.start();
85     }
86
87     /**
88      * {@inheritDoc}.
89      */
90     @Override
91     public void run() {
92         final Topic jmsTopic = new ActiveMQTopic(topic);
93         try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
94                 final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic)) {
95
96             while (producerThread.isAlive() && !stopFlag) {
97                 ThreadUtilities.sleep(50);
98
99                 if (sendEventsFlag) {
100                     sendEventsToTopic(jmsSession, jmsProducer);
101                     sendEventsFlag = false;
102                 }
103             }
104
105         } catch (final Exception e) {
106             throw new ApexEventRuntimeException("JMS event consumption failed", e);
107         }
108     }
109
110     /**
111      * Send events.
112      */
113     public void sendEvents() {
114         sendEventsFlag = true;
115     }
116
117     /**
118      * Send events to topic.
119      *
120      * @param jmsSession the jms session
121      * @param jmsProducer the jms producer
122      * @throws JMSException the JMS exception
123      */
124     private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
125
126         LOGGER.debug("{} : sending events to JMS server, event count {}", this.getClass().getName(), eventCount);
127
128         for (int i = 0; i < eventCount; i++) {
129             ThreadUtilities.sleep(eventInterval);
130
131             Message jmsMessage = null;
132             if (sendObjects) {
133                 final PingTestClass pingTestClass = new PingTestClass();
134                 pingTestClass.setId(i);
135                 jmsMessage = jmsSession.createObjectMessage(pingTestClass);
136             } else {
137                 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
138             }
139             jmsProducer.send(jmsMessage);
140             eventsSentCount++;
141         }
142         LOGGER.debug("{} : completed, number of events sent", this.getClass().getName(), eventsSentCount);
143     }
144
145     /**
146      * Shutdown.
147      */
148     public void shutdown() {
149         LOGGER.debug("{} : stopping", this.getClass().getName());
150         stopFlag = true;
151
152         while (producerThread.isAlive()) {
153             ThreadUtilities.sleep(10);
154         }
155         LOGGER.debug("{} : stopped", this.getClass().getName());
156     }
157
158 }