2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.apps.uservice.test.adapt.kafka;
23 import java.util.Properties;
24 import java.util.concurrent.TimeUnit;
26 import org.apache.kafka.clients.producer.KafkaProducer;
27 import org.apache.kafka.clients.producer.Producer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 * @author Liam Fallon (liam.fallon@ericsson.com)
35 public class KafkaEventProducer implements Runnable {
36 private final String topic;
37 private final String kafkaServerAddress;
38 private final int eventCount;
39 private final boolean xmlEvents;
40 private final long eventInterval;
41 private long eventsSentCount = 0;
43 private final Thread producerThread;
44 private boolean sendEventsFlag = false;
45 private boolean stopFlag = false;
47 public KafkaEventProducer(final String topic, final String kafkaServerAddress, final int eventCount,
48 final boolean xmlEvents, final long eventInterval) {
50 this.kafkaServerAddress = kafkaServerAddress;
51 this.eventCount = eventCount;
52 this.xmlEvents = xmlEvents;
53 this.eventInterval = eventInterval;
55 producerThread = new Thread(this);
56 producerThread.start();
61 final Properties kafkaProducerProperties = new Properties();
62 kafkaProducerProperties.put("bootstrap.servers", kafkaServerAddress);
63 kafkaProducerProperties.put("acks", "all");
64 kafkaProducerProperties.put("retries", 0);
65 kafkaProducerProperties.put("batch.size", 16384);
66 kafkaProducerProperties.put("linger.ms", 1);
67 kafkaProducerProperties.put("buffer.memory", 33554432);
68 kafkaProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
69 kafkaProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
71 final Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProducerProperties);
73 while (producerThread.isAlive() && !stopFlag) {
74 ThreadUtilities.sleep(50);
77 sendEventsToTopic(producer);
78 sendEventsFlag = false;
82 producer.close(1000, TimeUnit.MILLISECONDS);
85 public void sendEvents() {
86 sendEventsFlag = true;
89 private void sendEventsToTopic(final Producer<String, String> producer) {
90 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sending events to Kafka server at "
91 + kafkaServerAddress + ", event count " + eventCount + ", xmlEvents " + xmlEvents);
93 for (int i = 0; i < eventCount; i++) {
94 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
95 + " milliseconds before sending next event");
96 ThreadUtilities.sleep(eventInterval);
98 String eventString = null;
100 eventString = EventGenerator.xmlEvent();
102 eventString = EventGenerator.jsonEvent();
104 producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
107 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sent event " + eventString);
109 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": completed");
112 public long getEventsSentCount() {
113 return eventsSentCount;
116 public void shutdown() {
117 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopping");
121 while (producerThread.isAlive()) {
122 ThreadUtilities.sleep(10);
125 System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopped");
128 public static void main(final String[] args) {
129 if (args.length != 5) {
130 System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
136 eventCount = Integer.parseInt(args[2]);
137 } catch (final Exception e) {
138 System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
143 long eventInterval = 0;
145 eventInterval = Long.parseLong(args[4]);
146 } catch (final Exception e) {
147 System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
152 boolean xmlEvents = false;
153 if (args[3].equalsIgnoreCase("XML")) {
155 } else if (!args[3].equalsIgnoreCase("JSON")) {
156 System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
160 final KafkaEventProducer producer = new KafkaEventProducer(args[0], args[1], eventCount, xmlEvents,
163 producer.sendEvents();