8b9f2a1127a988f0a5ad1e3c6d17a85fa36ab2c2
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.apps.uservice.test.adapt.jms;
22
23 import javax.jms.Connection;
24 import javax.jms.ConnectionFactory;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.Topic;
30
31 import org.apache.activemq.command.ActiveMQTopic;
32 import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator;
33 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
34 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * @author Liam Fallon (liam.fallon@ericsson.com)
40  */
41 public class JMSEventProducer implements Runnable {
42     private static final Logger LOGGER = LoggerFactory.getLogger(JMSEventProducer.class);
43
44     private final String topic;
45     private final int eventCount;
46     private final boolean sendObjects;
47     private final long eventInterval;
48     private long eventsSentCount = 0;
49
50     private final Thread producerThread;
51     private boolean sendEventsFlag = false;
52     private boolean stopFlag = false;
53     private final Connection connection;
54
55     public JMSEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
56             final String password, final int eventCount, final boolean sendObjects, final long eventInterval)
57             throws JMSException {
58         this.topic = topic;
59         this.eventCount = eventCount;
60         this.sendObjects = sendObjects;
61         this.eventInterval = eventInterval;
62         connection = connectionFactory.createConnection(username, password);
63         connection.start();
64
65         producerThread = new Thread(this);
66         producerThread.start();
67     }
68
69     @Override
70     public void run() {
71         final Topic jmsTopic = new ActiveMQTopic(topic);
72         try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
73                 final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);) {
74
75             while (producerThread.isAlive() && !stopFlag) {
76                 ThreadUtilities.sleep(50);
77
78                 if (sendEventsFlag) {
79                     sendEventsToTopic(jmsSession, jmsProducer);
80                     sendEventsFlag = false;
81                 }
82             }
83
84         } catch (final Exception e) {
85             throw new ApexEventRuntimeException("JMS event consumption failed", e);
86         }
87     }
88
89     public void sendEvents() {
90         sendEventsFlag = true;
91     }
92
93     private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
94
95         LOGGER.info("{} : sending events to JMS server, event count {}", this.getClass().getCanonicalName(),
96                 eventCount);
97
98         for (int i = 0; i < eventCount; i++) {
99             ThreadUtilities.sleep(eventInterval);
100
101             Message jmsMessage = null;
102             if (sendObjects) {
103                 jmsMessage = jmsSession.createObjectMessage(new TestPing());
104             } else {
105                 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
106             }
107             jmsProducer.send(jmsMessage);
108             eventsSentCount++;
109         }
110         LOGGER.info("{} : completed, number of events sent", this.getClass().getCanonicalName(), eventsSentCount);
111     }
112
113     public long getEventsSentCount() {
114         return eventsSentCount;
115     }
116
117     public void shutdown() {
118         LOGGER.info("{} : stopping", this.getClass().getCanonicalName());
119         stopFlag = true;
120
121         while (producerThread.isAlive()) {
122             ThreadUtilities.sleep(10);
123         }
124         LOGGER.info("{} : stopped", this.getClass().getCanonicalName());
125     }
126
127 }