2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2022 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.common.endpoints.event.comm.bus.internal;
21 import org.onap.policy.common.endpoints.event.comm.Topic;
22 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * This implementation publishes events for the associated KAFKA topic, inline with the calling
30 public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink {
35 private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
38 * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
41 * <p>servers list of KAFKA servers available for publishing
42 * topic the topic to publish to
43 * partitionId the partition key (optional, autogenerated if not provided)
44 * useHttps does connection use HTTPS?
45 * @param busTopicParams contains attributes needed
46 * @throws IllegalArgumentException if invalid arguments are detected
48 public InlineKafkaTopicSink(BusTopicParams busTopicParams) {
49 super(busTopicParams);
53 * Instantiation of internal resources.
58 this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder()
59 .servers(this.servers)
60 .topic(this.effectiveTopic)
61 .useHttps(this.useHttps)
63 logger.info("{}: KAFKA SINK created", this);
67 public String toString() {
68 return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
69 + super.toString() + "]";
73 public CommInfrastructure getTopicCommInfrastructure() {
74 return Topic.CommInfrastructure.KAFKA;