2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.apps.uservice.test.adapt.kafka;
23 import java.util.Properties;
24 import java.util.concurrent.TimeUnit;
26 import org.apache.kafka.clients.producer.KafkaProducer;
27 import org.apache.kafka.clients.producer.Producer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 * @author Liam Fallon (liam.fallon@ericsson.com)
35 public class KafkaEventProducer implements Runnable {
36 private final String topic;
37 private final String kafkaServerAddress;
38 private final int eventCount;
39 private final boolean xmlEvents;
40 private final long eventInterval;
41 private long eventsSentCount = 0;
43 private Producer<String, String> producer;
45 private final Thread producerThread;
46 private boolean sendEventsFlag = false;
47 private boolean stopFlag = false;
49 public KafkaEventProducer(final String topic, final String kafkaServerAddress, final int eventCount,
50 final boolean xmlEvents, final long eventInterval) {
52 this.kafkaServerAddress = kafkaServerAddress;
53 this.eventCount = eventCount;
54 this.xmlEvents = xmlEvents;
55 this.eventInterval = eventInterval;
57 producerThread = new Thread(this);
58 producerThread.start();
63 final Properties kafkaProducerProperties = new Properties();
64 kafkaProducerProperties.put("bootstrap.servers", kafkaServerAddress);
65 kafkaProducerProperties.put("acks", "all");
66 kafkaProducerProperties.put("retries", 0);
67 kafkaProducerProperties.put("batch.size", 16384);
68 kafkaProducerProperties.put("linger.ms", 1);
69 kafkaProducerProperties.put("buffer.memory", 33554432);
70 kafkaProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
71 kafkaProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
73 producer = new KafkaProducer<String, String>(kafkaProducerProperties);
75 while (producerThread.isAlive() && !stopFlag) {
76 ThreadUtilities.sleep(50);
80 sendEventsFlag = false;
84 producer.close(1000, TimeUnit.MILLISECONDS);
87 public void sendEvents() {
88 sendEventsFlag = true;
91 private void sendEventsToTopic() {
92 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sending events to Kafka server at "
93 + kafkaServerAddress + ", event count " + eventCount + ", xmlEvents " + xmlEvents);
95 for (int i = 0; i < eventCount; i++) {
96 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
97 + " milliseconds before sending next event");
98 ThreadUtilities.sleep(eventInterval);
100 String eventString = null;
102 eventString = EventGenerator.xmlEvent();
104 eventString = EventGenerator.jsonEvent();
106 producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
109 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sent event " + eventString);
111 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": completed");
114 public long getEventsSentCount() {
115 return eventsSentCount;
118 public void shutdown() {
119 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopping");
123 while (producerThread.isAlive()) {
124 ThreadUtilities.sleep(10);
127 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopped");
130 public static void main(final String[] args) {
131 if (args.length != 5) {
132 System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
138 eventCount = Integer.parseInt(args[2]);
139 } catch (final Exception e) {
140 System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
145 long eventInterval = 0;
147 eventInterval = Long.parseLong(args[4]);
148 } catch (final Exception e) {
149 System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
154 boolean xmlEvents = false;
155 if (args[3].equalsIgnoreCase("XML")) {
157 } else if (!args[3].equalsIgnoreCase("JSON")) {
158 System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
162 final KafkaEventProducer producer =
163 new KafkaEventProducer(args[0], args[1], eventCount, xmlEvents, eventInterval);
165 producer.sendEvents();