072d678640d8a6dfb9af5abb10f1e8c3ab0b6271
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
22
23 import java.util.Properties;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.kafka.clients.producer.KafkaProducer;
27 import org.apache.kafka.clients.producer.Producer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
30 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
31
32 /**
33  * The Class KafkaEventProducer.
34  *
35  * @author Liam Fallon (liam.fallon@ericsson.com)
36  */
37 public class KafkaEventProducer implements Runnable {
38     private final String topic;
39     private final String kafkaServerAddress;
40     private final int eventCount;
41     private final boolean xmlEvents;
42     private final long eventInterval;
43     private long eventsSentCount = 0;
44
45     private final Thread producerThread;
46     private boolean sendEventsFlag = false;
47     private boolean stopFlag = false;
48
49     /**
50      * Instantiates a new kafka event producer.
51      *
52      * @param topic the topic
53      * @param kafkaServerAddress the kafka server address
54      * @param eventCount the event count
55      * @param xmlEvents the xml events
56      * @param eventInterval the event interval
57      */
58     public KafkaEventProducer(final String topic, final String kafkaServerAddress, final int eventCount,
59                     final boolean xmlEvents, final long eventInterval) {
60         this.topic = topic;
61         this.kafkaServerAddress = kafkaServerAddress;
62         this.eventCount = eventCount;
63         this.xmlEvents = xmlEvents;
64         this.eventInterval = eventInterval;
65
66         producerThread = new Thread(this);
67         producerThread.start();
68     }
69
70     /* (non-Javadoc)
71      * @see java.lang.Runnable#run()
72      */
73     @Override
74     public void run() {
75         final Properties kafkaProducerProperties = new Properties();
76         kafkaProducerProperties.put("bootstrap.servers", kafkaServerAddress);
77         kafkaProducerProperties.put("acks", "all");
78         kafkaProducerProperties.put("retries", 0);
79         kafkaProducerProperties.put("batch.size", 16384);
80         kafkaProducerProperties.put("linger.ms", 1);
81         kafkaProducerProperties.put("buffer.memory", 33554432);
82         kafkaProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
83         kafkaProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
84
85         final Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProducerProperties);
86
87         while (producerThread.isAlive() && !stopFlag) {
88             ThreadUtilities.sleep(50);
89
90             if (sendEventsFlag) {
91                 sendEventsToTopic(producer);
92                 sendEventsFlag = false;
93             }
94         }
95
96         producer.close(1000, TimeUnit.MILLISECONDS);
97     }
98
99     /**
100      * Send events.
101      */
102     public void sendEvents() {
103         sendEventsFlag = true;
104     }
105
106     /**
107      * Send events to topic.
108      *
109      * @param producer the producer
110      */
111     private void sendEventsToTopic(final Producer<String, String> producer) {
112         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sending events to Kafka server at "
113                         + kafkaServerAddress + ", event count " + eventCount + ", xmlEvents " + xmlEvents);
114
115         for (int i = 0; i < eventCount; i++) {
116             System.out.println(KafkaEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
117                             + " milliseconds before sending next event");
118             ThreadUtilities.sleep(eventInterval);
119
120             String eventString = null;
121             if (xmlEvents) {
122                 eventString = EventGenerator.xmlEvent();
123             } else {
124                 eventString = EventGenerator.jsonEvent();
125             }
126             producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
127             producer.flush();
128             eventsSentCount++;
129             System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sent event " + eventString);
130         }
131         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": completed");
132     }
133
134     /**
135      * Gets the events sent count.
136      *
137      * @return the events sent count
138      */
139     public long getEventsSentCount() {
140         return eventsSentCount;
141     }
142
143     /**
144      * Shutdown.
145      */
146     public void shutdown() {
147         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopping");
148
149         stopFlag = true;
150
151         while (producerThread.isAlive()) {
152             ThreadUtilities.sleep(10);
153         }
154
155         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopped");
156     }
157
158     /**
159      * The main method.
160      *
161      * @param args the arguments
162      */
163     public static void main(final String[] args) {
164         if (args.length != 5) {
165             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
166             return;
167         }
168
169         int eventCount = 0;
170         try {
171             eventCount = Integer.parseInt(args[2]);
172         } catch (final Exception e) {
173             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
174             e.printStackTrace();
175             return;
176         }
177
178         long eventInterval = 0;
179         try {
180             eventInterval = Long.parseLong(args[4]);
181         } catch (final Exception e) {
182             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
183             e.printStackTrace();
184             return;
185         }
186
187         boolean xmlEvents = false;
188         if (args[3].equalsIgnoreCase("XML")) {
189             xmlEvents = true;
190         } else if (!args[3].equalsIgnoreCase("JSON")) {
191             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
192             return;
193         }
194
195         final KafkaEventProducer producer = new KafkaEventProducer(args[0], args[1], eventCount, xmlEvents,
196                         eventInterval);
197
198         producer.sendEvents();
199         producer.shutdown();
200     }
201 }