c243b8b075adb7a864d9ab88457a1fbee9c2662f
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.apps.uservice.test.adapt.kafka;
22
23 import java.util.Properties;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.kafka.clients.producer.KafkaProducer;
27 import org.apache.kafka.clients.producer.Producer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31
32 /**
33  * @author Liam Fallon (liam.fallon@ericsson.com)
34  */
35 public class KafkaEventProducer implements Runnable {
36     private final String topic;
37     private final String kafkaServerAddress;
38     private final int eventCount;
39     private final boolean xmlEvents;
40     private final long eventInterval;
41     private long eventsSentCount = 0;
42
43     private final Thread producerThread;
44     private boolean sendEventsFlag = false;
45     private boolean stopFlag = false;
46
47     public KafkaEventProducer(final String topic, final String kafkaServerAddress, final int eventCount,
48                     final boolean xmlEvents, final long eventInterval) {
49         this.topic = topic;
50         this.kafkaServerAddress = kafkaServerAddress;
51         this.eventCount = eventCount;
52         this.xmlEvents = xmlEvents;
53         this.eventInterval = eventInterval;
54
55         producerThread = new Thread(this);
56         producerThread.start();
57     }
58
59     @Override
60     public void run() {
61         final Properties kafkaProducerProperties = new Properties();
62         kafkaProducerProperties.put("bootstrap.servers", kafkaServerAddress);
63         kafkaProducerProperties.put("acks", "all");
64         kafkaProducerProperties.put("retries", 0);
65         kafkaProducerProperties.put("batch.size", 16384);
66         kafkaProducerProperties.put("linger.ms", 1);
67         kafkaProducerProperties.put("buffer.memory", 33554432);
68         kafkaProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
69         kafkaProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
70
71         final Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProducerProperties);
72
73         while (producerThread.isAlive() && !stopFlag) {
74             ThreadUtilities.sleep(50);
75
76             if (sendEventsFlag) {
77                 sendEventsToTopic(producer);
78                 sendEventsFlag = false;
79             }
80         }
81
82         producer.close(1000, TimeUnit.MILLISECONDS);
83     }
84
85     public void sendEvents() {
86         sendEventsFlag = true;
87     }
88
89     private void sendEventsToTopic(final Producer<String, String> producer) {
90         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sending events to Kafka server at "
91                         + kafkaServerAddress + ", event count " + eventCount + ", xmlEvents " + xmlEvents);
92
93         for (int i = 0; i < eventCount; i++) {
94             System.out.println(KafkaEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
95                             + " milliseconds before sending next event");
96             ThreadUtilities.sleep(eventInterval);
97
98             String eventString = null;
99             if (xmlEvents) {
100                 eventString = EventGenerator.xmlEvent();
101             } else {
102                 eventString = EventGenerator.jsonEvent();
103             }
104             producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
105             producer.flush();
106             eventsSentCount++;
107             System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sent event " + eventString);
108         }
109         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": completed");
110     }
111
112     public long getEventsSentCount() {
113         return eventsSentCount;
114     }
115
116     public void shutdown() {
117         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopping");
118
119         stopFlag = true;
120
121         while (producerThread.isAlive()) {
122             ThreadUtilities.sleep(10);
123         }
124
125         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopped");
126     }
127
128     public static void main(final String[] args) {
129         if (args.length != 5) {
130             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
131             return;
132         }
133
134         int eventCount = 0;
135         try {
136             eventCount = Integer.parseInt(args[2]);
137         } catch (final Exception e) {
138             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
139             e.printStackTrace();
140             return;
141         }
142
143         long eventInterval = 0;
144         try {
145             eventInterval = Long.parseLong(args[4]);
146         } catch (final Exception e) {
147             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
148             e.printStackTrace();
149             return;
150         }
151
152         boolean xmlEvents = false;
153         if (args[3].equalsIgnoreCase("XML")) {
154             xmlEvents = true;
155         } else if (!args[3].equalsIgnoreCase("JSON")) {
156             System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
157             return;
158         }
159
160         final KafkaEventProducer producer = new KafkaEventProducer(args[0], args[1], eventCount, xmlEvents,
161                         eventInterval);
162
163         producer.sendEvents();
164         producer.shutdown();
165     }
166 }