e0f5c23c4c2e4ddf7bc5730f3e01f358ea74f548
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
22
23 import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
24
25 import java.time.Duration;
26
27 import org.apache.kafka.clients.producer.Producer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.apache.kafka.common.serialization.StringSerializer;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * The Class KafkaEventProducer.
37  *
38  * @author Liam Fallon (liam.fallon@ericsson.com)
39  */
40 public class KafkaEventProducer implements Runnable {
41     // Get a reference to the logger
42     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventProducer.class);
43
44     private final String topic;
45     private final SharedKafkaTestResource sharedKafkaTestResource;
46     private final int eventCount;
47     private final boolean xmlEvents;
48     private final long eventInterval;
49     private long eventsSentCount = 0;
50
51     private final Thread producerThread;
52     private boolean sendEventsFlag = false;
53     private boolean stopFlag = false;
54
55     /**
56      * Instantiates a new kafka event producer.
57      *
58      * @param topic the topic
59      * @param sharedKafkaTestResource the kafka server address
60      * @param eventCount the event count
61      * @param xmlEvents the xml events
62      * @param eventInterval the event interval
63      */
64     public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource,
65                     final int eventCount, final boolean xmlEvents, final long eventInterval) {
66         this.topic = topic;
67         this.sharedKafkaTestResource = sharedKafkaTestResource;
68         this.eventCount = eventCount;
69         this.xmlEvents = xmlEvents;
70         this.eventInterval = eventInterval;
71
72         producerThread = new Thread(this);
73         producerThread.start();
74     }
75
76     /**
77      * {@inheritDoc}.
78      */
79     @Override
80     public void run() {
81         final Producer<String, String> producer = sharedKafkaTestResource.getKafkaTestUtils()
82                         .getKafkaProducer(StringSerializer.class, StringSerializer.class);
83
84         while (producerThread.isAlive() && !stopFlag) {
85             ThreadUtilities.sleep(50);
86
87             if (sendEventsFlag) {
88                 sendEventsToTopic(producer);
89                 sendEventsFlag = false;
90             }
91         }
92
93         producer.close(Duration.ofMillis(1000));
94     }
95
96     /**
97      * Send events.
98      */
99     public void sendEvents() {
100         sendEventsFlag = true;
101     }
102
103     /**
104      * Send events to topic.
105      *
106      * @param producer the producer
107      */
108     private void sendEventsToTopic(final Producer<String, String> producer) {
109         LOGGER.debug("{} : sending events to Kafka server, event count {}, xmlEvents {}",
110                         KafkaEventProducer.class.getCanonicalName(), eventCount, xmlEvents);
111
112         for (int i = 0; i < eventCount; i++) {
113             LOGGER.debug("{} : waiting {} milliseconds before sending next event",
114                             KafkaEventProducer.class.getCanonicalName(), eventInterval);
115             ThreadUtilities.sleep(eventInterval);
116
117             String eventString = null;
118             if (xmlEvents) {
119                 eventString = EventGenerator.xmlEvent();
120             } else {
121                 eventString = EventGenerator.jsonEvent();
122             }
123             producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
124             producer.flush();
125             eventsSentCount++;
126             LOGGER.debug("****** Sent event No. {} ******", eventsSentCount);
127         }
128         LOGGER.debug("{}: completed", KafkaEventProducer.class.getCanonicalName());
129     }
130
131     /**
132      * Gets the events sent count.
133      *
134      * @return the events sent count
135      */
136     public long getEventsSentCount() {
137         return eventsSentCount;
138     }
139
140     /**
141      * Shutdown.
142      */
143     public void shutdown() {
144         LOGGER.debug("{} : stopping", KafkaEventProducer.class.getCanonicalName());
145
146         stopFlag = true;
147
148         while (producerThread.isAlive()) {
149             ThreadUtilities.sleep(10);
150         }
151
152         LOGGER.debug("{} : stopped", KafkaEventProducer.class.getCanonicalName());
153     }
154 }