2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.netconfsimulator.kafka.listener;
23 import org.springframework.beans.factory.annotation.Autowired;
24 import org.springframework.kafka.core.ConsumerFactory;
26 import org.springframework.kafka.listener.ContainerProperties;
27 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
28 import org.springframework.kafka.listener.MessageListener;
31 import org.springframework.kafka.support.TopicPartitionInitialOffset;
33 import java.time.Instant;
35 public class KafkaListenerHandler {
37 private static final int PARTITION = 0;
38 private static final long NUMBER_OF_HISTORICAL_MESSAGES_TO_SHOW = -10L;
39 private static final boolean RELATIVE_TO_CURRENT = false;
40 private ConsumerFactory<String, String> consumerFactory;
44 public KafkaListenerHandler(ConsumerFactory<String, String> consumerFactory) {
45 this.consumerFactory = consumerFactory;
49 public KafkaListenerEntry createKafkaListener(MessageListener messageListener, String topicName) {
50 String clientId = Long.toString(Instant.now().getEpochSecond());
51 ContainerProperties containerProperties = new ContainerProperties(topicName);
52 containerProperties.setGroupId(clientId);
53 KafkaMessageListenerContainer<String, String> listenerContainer = createListenerContainer(containerProperties,
56 listenerContainer.setupMessageListener(messageListener);
57 return new KafkaListenerEntry(clientId, listenerContainer);
61 KafkaMessageListenerContainer<String, String> createListenerContainer(ContainerProperties containerProperties,
63 TopicPartitionInitialOffset config = new TopicPartitionInitialOffset(topicName, PARTITION,
64 NUMBER_OF_HISTORICAL_MESSAGES_TO_SHOW, RELATIVE_TO_CURRENT);
65 return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties, config);