604315d5f02ab664f34fe15e4a684848974969f5
[integration.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Simulator
4  * ================================================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.netconfsimulator.kafka.listener;
22
23 import org.springframework.beans.factory.annotation.Autowired;
24 import org.springframework.kafka.core.ConsumerFactory;
25
26 import org.springframework.kafka.listener.ContainerProperties;
27 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
28 import org.springframework.kafka.listener.MessageListener;
29
30
31 import org.springframework.kafka.support.TopicPartitionInitialOffset;
32
33 import java.time.Instant;
34
35 public class KafkaListenerHandler {
36
37     private static final int PARTITION = 0;
38     private static final long NUMBER_OF_HISTORICAL_MESSAGES_TO_SHOW = -10L;
39     private static final boolean RELATIVE_TO_CURRENT = false;
40     private ConsumerFactory<String, String> consumerFactory;
41
42
43     @Autowired
44     public KafkaListenerHandler(ConsumerFactory<String, String> consumerFactory) {
45         this.consumerFactory = consumerFactory;
46     }
47
48
49     public KafkaListenerEntry createKafkaListener(MessageListener messageListener, String topicName) {
50         String clientId = Long.toString(Instant.now().getEpochSecond());
51         ContainerProperties containerProperties = new ContainerProperties(topicName);
52         containerProperties.setGroupId(clientId);
53         KafkaMessageListenerContainer<String, String> listenerContainer = createListenerContainer(containerProperties,
54             topicName);
55
56         listenerContainer.setupMessageListener(messageListener);
57         return new KafkaListenerEntry(clientId, listenerContainer);
58     }
59
60
61     KafkaMessageListenerContainer<String, String> createListenerContainer(ContainerProperties containerProperties,
62         String topicName) {
63         TopicPartitionInitialOffset config = new TopicPartitionInitialOffset(topicName, PARTITION,
64             NUMBER_OF_HISTORICAL_MESSAGES_TO_SHOW, RELATIVE_TO_CURRENT);
65         return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties, config);
66     }
67 }