639b450df21ae98a815f2e842d414d4722515651
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.apps.uservice.test.adapt.kafka;
22
23 import java.util.Properties;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.kafka.clients.producer.KafkaProducer;
27 import org.apache.kafka.clients.producer.Producer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31
32 /**
33  * @author Liam Fallon (liam.fallon@ericsson.com)
34  */
35 public class KafkaEventProducer implements Runnable {
36     private final String topic;
37     private final String kafkaServerAddress;
38     private final int eventCount;
39     private final boolean xmlEvents;
40     private final long eventInterval;
41     private long eventsSentCount = 0;
42
43     private Producer<String, String> producer;
44
45     private final Thread producerThread;
46     private boolean sendEventsFlag = false;
47     private boolean stopFlag = false;
48
49     public KafkaEventProducer(final String topic, final String kafkaServerAddress, final int eventCount,
50             final boolean xmlEvents, final long eventInterval) {
51         this.topic = topic;
52         this.kafkaServerAddress = kafkaServerAddress;
53         this.eventCount = eventCount;
54         this.xmlEvents = xmlEvents;
55         this.eventInterval = eventInterval;
56
57         producerThread = new Thread(this);
58         producerThread.start();
59     }
60
61     @Override
62     public void run() {
63         final Properties kafkaProducerProperties = new Properties();
64         kafkaProducerProperties.put("bootstrap.servers", kafkaServerAddress);
65         kafkaProducerProperties.put("acks", "all");
66         kafkaProducerProperties.put("retries", 0);
67         kafkaProducerProperties.put("batch.size", 16384);
68         kafkaProducerProperties.put("linger.ms", 1);
69         kafkaProducerProperties.put("buffer.memory", 33554432);
70         kafkaProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
71         kafkaProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
72
73         producer = new KafkaProducer<String, String>(kafkaProducerProperties);
74
75         while (producerThread.isAlive() && !stopFlag) {
76             ThreadUtilities.sleep(50);
77
78             if (sendEventsFlag) {
79                 sendEventsToTopic();
80                 sendEventsFlag = false;
81             }
82         }
83
84         producer.close(1000, TimeUnit.MILLISECONDS);
85     }
86
87     public void sendEvents() {
88         sendEventsFlag = true;
89     }
90
91     private void sendEventsToTopic() {
92         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sending events to Kafka server at "
93                 + kafkaServerAddress + ", event count " + eventCount + ", xmlEvents " + xmlEvents);
94
95         for (int i = 0; i < eventCount; i++) {
96             System.out.println(KafkaEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
97                     + " milliseconds before sending next event");
98             ThreadUtilities.sleep(eventInterval);
99
100             String eventString = null;
101             if (xmlEvents) {
102                 eventString = EventGenerator.xmlEvent();
103             } else {
104                 eventString = EventGenerator.jsonEvent();
105             }
106             producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
107             producer.flush();
108             eventsSentCount++;
109             System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sent event " + eventString);
110         }
111         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": completed");
112     }
113
114     public long getEventsSentCount() {
115         return eventsSentCount;
116     }
117
118     public void shutdown() {
119         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopping");
120
121         stopFlag = true;
122
123         while (producerThread.isAlive()) {
124             ThreadUtilities.sleep(10);
125         }
126
127         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopped");
128     }
129
130     public static void main(final String[] args) {
131         if (args.length != 5) {
132             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
133             return;
134         }
135
136         int eventCount = 0;
137         try {
138             eventCount = Integer.parseInt(args[2]);
139         } catch (final Exception e) {
140             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
141             e.printStackTrace();
142             return;
143         }
144
145         long eventInterval = 0;
146         try {
147             eventInterval = Long.parseLong(args[4]);
148         } catch (final Exception e) {
149             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
150             e.printStackTrace();
151             return;
152         }
153
154         boolean xmlEvents = false;
155         if (args[3].equalsIgnoreCase("XML")) {
156             xmlEvents = true;
157         } else if (!args[3].equalsIgnoreCase("JSON")) {
158             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
159             return;
160         }
161
162         final KafkaEventProducer producer =
163                 new KafkaEventProducer(args[0], args[1], eventCount, xmlEvents, eventInterval);
164
165         producer.sendEvents();
166         producer.shutdown();
167     }
168 }