e5ddbfee563e85581b89ba9259c1f94ddd8b9eba
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.apps.uservice.test.adapt.jms;
22
23 import javax.jms.Connection;
24 import javax.jms.ConnectionFactory;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.Topic;
30
31 import org.apache.activemq.command.ActiveMQTopic;
32 import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator;
33 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
34 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
35
36 /**
37  * @author Liam Fallon (liam.fallon@ericsson.com)
38  */
39 public class JMSEventProducer implements Runnable {
40     private final String topic;
41     private final int eventCount;
42     private final boolean sendObjects;
43     private final long eventInterval;
44     private long eventsSentCount = 0;
45
46     private final Thread producerThread;
47     private boolean sendEventsFlag = false;
48     private boolean stopFlag = false;
49     private final Connection connection;
50
51     public JMSEventProducer(String topic, ConnectionFactory connectionFactory, String username, String password,
52             final int eventCount, final boolean sendObjects, final long eventInterval) throws JMSException {
53         this.topic = topic;
54         this.eventCount = eventCount;
55         this.sendObjects = sendObjects;
56         this.eventInterval = eventInterval;
57         connection = connectionFactory.createConnection(username, password);
58         connection.start();
59
60         producerThread = new Thread(this);
61         producerThread.start();
62     }
63
64     @Override
65     public void run() {
66         try {
67             final Topic jmsTopic = new ActiveMQTopic(topic);
68             final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
69             final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);
70
71             while (producerThread.isAlive() && !stopFlag) {
72                 ThreadUtilities.sleep(50);
73
74                 if (sendEventsFlag) {
75                     sendEventsToTopic(jmsSession, jmsProducer);
76                     sendEventsFlag = false;
77                 }
78             }
79
80             jmsProducer.close();
81             jmsSession.close();
82         } catch (final Exception e) {
83             throw new ApexEventRuntimeException("JMS event consumption failed", e);
84         }
85     }
86
87     public void sendEvents() {
88         sendEventsFlag = true;
89     }
90
91     private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
92         System.out.println(JMSEventProducer.class.getCanonicalName() + ": sending events to JMS server, event count "
93                 + eventCount);
94
95         for (int i = 0; i < eventCount; i++) {
96             System.out.println(JMSEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
97                     + " milliseconds before sending next event");
98             ThreadUtilities.sleep(eventInterval);
99
100             Message jmsMessage = null;
101             if (sendObjects) {
102                 jmsMessage = jmsSession.createObjectMessage(new TestPing());
103             } else {
104                 jmsMessage = jmsSession.createTextMessage(EventGenerator.jsonEvent());
105             }
106             jmsProducer.send(jmsMessage);
107             eventsSentCount++;
108             System.out.println(JMSEventProducer.class.getCanonicalName() + ": sent event " + jmsMessage.toString());
109         }
110         System.out.println(JMSEventProducer.class.getCanonicalName() + ": completed");
111     }
112
113     public long getEventsSentCount() {
114         return eventsSentCount;
115     }
116
117     public void shutdown() {
118         System.out.println(JMSEventProducer.class.getCanonicalName() + ": stopping");
119
120         stopFlag = true;
121
122         while (producerThread.isAlive()) {
123             ThreadUtilities.sleep(10);
124         }
125
126         System.out.println(JMSEventProducer.class.getCanonicalName() + ": stopped");
127     }
128
129 }