4b6a62e284ab2843583797e3fde5cdc90f3c4b44
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
22
23 import java.util.Arrays;
24 import java.util.Properties;
25
26 import org.apache.kafka.clients.consumer.ConsumerRecord;
27 import org.apache.kafka.clients.consumer.ConsumerRecords;
28 import org.apache.kafka.clients.consumer.KafkaConsumer;
29 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31
32 /**
33  * The Class KafkaEventSubscriber.
34  *
35  * @author Liam Fallon (liam.fallon@ericsson.com)
36  */
37 public class KafkaEventSubscriber implements Runnable {
38     private final String topic;
39     private final String kafkaServerAddress;
40     private long eventsReceivedCount = 0;
41
42     KafkaConsumer<String, String> consumer;
43
44     Thread subscriberThread;
45
46     /**
47      * Instantiates a new kafka event subscriber.
48      *
49      * @param topic the topic
50      * @param kafkaServerAddress the kafka server address
51      * @throws MessagingException the messaging exception
52      */
53     public KafkaEventSubscriber(final String topic, final String kafkaServerAddress) throws MessagingException {
54         this.topic = topic;
55         this.kafkaServerAddress = kafkaServerAddress;
56
57         final Properties props = new Properties();
58         props.put("bootstrap.servers", kafkaServerAddress);
59         props.put("group.id", "test");
60         props.put("enable.auto.commit", "true");
61         props.put("auto.commit.interval.ms", "1000");
62         props.put("session.timeout.ms", "30000");
63         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
64         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
65
66         consumer = new KafkaConsumer<String, String>(props);
67         consumer.subscribe(Arrays.asList(topic));
68
69         subscriberThread = new Thread(this);
70         subscriberThread.start();
71     }
72
73     /* (non-Javadoc)
74      * @see java.lang.Runnable#run()
75      */
76     @Override
77     public void run() {
78         System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": receiving events from Kafka server at "
79                 + kafkaServerAddress + " on topic " + topic);
80
81         while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
82             try {
83                 final ConsumerRecords<String, String> records = consumer.poll(100);
84                 for (final ConsumerRecord<String, String> record : records) {
85                     System.out.println("******");
86                     System.out.println("offset=" + record.offset());
87                     System.out.println("key=" + record.key());
88                     System.out.println("name=" + record.value());
89                     eventsReceivedCount++;
90                 }
91             } catch (final Exception e) {
92                 // Thread interrupted
93                 break;
94             }
95         }
96
97         System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": event reception completed");
98     }
99
100     /**
101      * Gets the events received count.
102      *
103      * @return the events received count
104      */
105     public long getEventsReceivedCount() {
106         return eventsReceivedCount;
107     }
108
109     /**
110      * Shutdown.
111      */
112     public void shutdown() {
113         subscriberThread.interrupt();
114
115         while (subscriberThread.isAlive()) {
116             ThreadUtilities.sleep(10);
117         }
118
119         consumer.close();
120         System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": stopped");
121     }
122
123
124     /**
125      * The main method.
126      *
127      * @param args the arguments
128      * @throws MessagingException the messaging exception
129      */
130     public static void main(final String[] args) throws MessagingException {
131         if (args.length != 2) {
132             System.err.println("usage KafkaEventSubscriber topic kafkaServerAddress");
133             return;
134         }
135         new KafkaEventSubscriber(args[0], args[1]);
136     }
137 }