2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.api.impl.config.kafka;
23 import io.cloudevents.CloudEvent;
24 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
25 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
26 import java.time.Duration;
28 import lombok.RequiredArgsConstructor;
29 import org.apache.kafka.clients.consumer.ConsumerConfig;
30 import org.apache.kafka.clients.producer.ProducerConfig;
31 import org.springframework.beans.factory.annotation.Value;
32 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
33 import org.springframework.boot.ssl.SslBundles;
34 import org.springframework.context.annotation.Bean;
35 import org.springframework.context.annotation.Configuration;
36 import org.springframework.context.annotation.Primary;
37 import org.springframework.kafka.annotation.EnableKafka;
38 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
39 import org.springframework.kafka.core.ConsumerFactory;
40 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
41 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
42 import org.springframework.kafka.core.KafkaTemplate;
43 import org.springframework.kafka.core.ProducerFactory;
44 import org.springframework.kafka.support.serializer.JsonDeserializer;
45 import org.springframework.kafka.support.serializer.JsonSerializer;
48 * kafka Configuration for legacy and cloud events.
50 * @param <T> valid legacy event to be published over the wire.
54 @RequiredArgsConstructor
55 public class KafkaConfig<T> {
57 private final KafkaProperties kafkaProperties;
59 @Value("${cps.tracing.enabled:false}")
60 private boolean tracingEnabled;
62 private static final SslBundles NO_SSL = null;
65 * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
66 * application.yml and replaces value-serializer by JsonSerializer.
68 * @return legacy event producer instance.
71 public ProducerFactory<String, T> legacyEventProducerFactory() {
72 final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
73 producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
75 producerConfigProperties.put(
76 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
78 return new DefaultKafkaProducerFactory<>(producerConfigProperties);
82 * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
83 * into application.yml and replaces deserializer-value by JsonDeserializer.
85 * @return an instance of legacy consumer factory.
88 public ConsumerFactory<String, T> legacyEventConsumerFactory() {
89 final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
90 consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
92 consumerConfigProperties.put(
93 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
95 return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
99 * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
101 * @return an instance of legacy Kafka template.
105 public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
106 final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
107 kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
108 if (tracingEnabled) {
109 kafkaTemplate.setObservationEnabled(true);
111 return kafkaTemplate;
115 * A legacy concurrent kafka listener container factory.
117 * @return instance of Concurrent kafka listener factory
120 public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
121 final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
122 new ConcurrentKafkaListenerContainerFactory<>();
123 containerFactory.setConsumerFactory(legacyEventConsumerFactory());
124 containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
125 if (tracingEnabled) {
126 containerFactory.getContainerProperties().setObservationEnabled(true);
128 return containerFactory;
132 * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
133 * application.yml with CloudEventSerializer.
135 * @return cloud event producer instance.
138 public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
139 final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
140 if (tracingEnabled) {
141 producerConfigProperties.put(
142 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
144 return new DefaultKafkaProducerFactory<>(producerConfigProperties);
148 * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
149 * into application.yml having CloudEventDeserializer as deserializer-value.
151 * @return an instance of cloud consumer factory.
154 public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
155 final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
156 if (tracingEnabled) {
157 consumerConfigProperties.put(
158 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
160 return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
165 * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
167 * @return an instance of cloud Kafka template.
170 public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
171 final KafkaTemplate<String, CloudEvent> kafkaTemplate =
172 new KafkaTemplate<>(cloudEventProducerFactory());
173 kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
174 if (tracingEnabled) {
175 kafkaTemplate.setObservationEnabled(true);
177 return kafkaTemplate;
181 * A Concurrent CloudEvent kafka listener container factory.
183 * @return instance of Concurrent kafka listener factory
186 public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
187 cloudEventConcurrentKafkaListenerContainerFactory() {
188 final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
189 new ConcurrentKafkaListenerContainerFactory<>();
190 containerFactory.setConsumerFactory(cloudEventConsumerFactory());
191 containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
192 if (tracingEnabled) {
193 containerFactory.getContainerProperties().setObservationEnabled(true);
195 return containerFactory;