}
var kafkaTopicSource = makeSource(busTopicParams);
+
kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource);
return kafkaTopicSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
*/
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+ private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+
/**
* Kafka consumer.
*/
- private KafkaConsumer<String, String> consumer;
+ protected KafkaConsumer<String, String> consumer;
+ protected Properties kafkaProps;
/**
* Kafka Consumer Wrapper.
* @throws GeneralSecurityException - Security exception
* @throws MalformedURLException - Malformed URL exception
*/
- public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
+ public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
super(busTopicParams);
+
+ if (busTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for Kafka");
+ }
+
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ busTopicParams.getServers().get(0));
+
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ kafkaProps.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ }
+ if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ }
+ if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
+ }
+ consumer = new KafkaConsumer<>(kafkaProps);
+ //Subscribe to the topic
+ consumer.subscribe(Arrays.asList(busTopicParams.getTopic()));
}
@Override
public Iterable<String> fetch() throws IOException {
- // TODO: Not implemented yet
- return new ArrayList<>();
+ ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
+ if (records == null || records.count() <= 0) {
+ return Collections.emptyList();
+ }
+ List<String> messages = new ArrayList<>(records.count());
+ try {
+ for (TopicPartition partition : records.partitions()) {
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(record.value());
+ }
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ } catch (Exception e) {
+ logger.error("{}: cannot fetch because of {}", this, e.getMessage());
+ sleepAfterFetchFailure();
+ throw e;
+ }
+ return messages;
}
@Override
public void close() {
super.close();
this.consumer.close();
+ logger.info("Kafka Consumer exited {}", this);
}
@Override
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.record.CompressionType;
import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
public static class KafkaPublisherWrapper implements BusPublisher {
private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+ private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+ private String topic;
/**
- * The actual Kafka publisher.
+ * Kafka publisher.
*/
- private final KafkaProducer producer;
+ private Producer<String, String> producer;
+ protected Properties kafkaProps;
/**
- * Constructor.
+ * Kafka Publisher Wrapper.
*
* @param busTopicParams topic parameters
*/
- public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
- // TODO Setting of topic parameters is not implemented yet.
- //Setup Properties for Kafka Producer
- Properties kafkaProps = new Properties();
- this.producer = new KafkaProducer(kafkaProps);
+ protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
+
+ if (busTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for Kafka");
+ }
+
+ this.topic = busTopicParams.getTopic();
+
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ kafkaProps.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+ if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+
+ producer = new KafkaProducer<>(kafkaProps);
}
@Override
if (message == null) {
throw new IllegalArgumentException("No message provided");
}
- // TODO Sending messages is not implemented yet
+
+ try {
+ //Create the record
+ ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
+ UUID.randomUUID().toString(), message);
+
+ this.producer.send(record);
+ producer.flush();
+ } catch (Exception e) {
+ logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
+ return false;
+ }
return true;
}
public void close() {
logger.info("{}: CLOSE", this);
- try (this.producer) {
+ try {
this.producer.close();
} catch (Exception e) {
logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
import org.slf4j.Logger;
*/
private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+ protected Map<String, String> additionalProps = null;
+
/**
* Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
* attributes.
*/
public InlineKafkaTopicSink(BusTopicParams busTopicParams) {
super(busTopicParams);
+ this.additionalProps = busTopicParams.getAdditionalProps();
}
/**
.servers(this.servers)
.topic(this.effectiveTopic)
.useHttps(this.useHttps)
+ .additionalProps(this.additionalProps)
.build());
logger.info("{}: KAFKA SINK created", this);
}
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.net.MalformedURLException;
+import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
*/
public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
+ protected Map<String, String> additionalProps = null;
+
/**
* Constructor.
*
*/
public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) {
super(busTopicParams);
- this.init();
+ this.additionalProps = busTopicParams.getAdditionalProps();
+ try {
+ this.init();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e);
+ }
}
/**
* Initialize the Cambria client.
*/
@Override
- public void init() {
- this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder()
+ public void init() throws MalformedURLException {
+ BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
.servers(this.servers)
.topic(this.effectiveTopic)
- .useHttps(this.useHttps)
- .build());
+ .fetchTimeout(this.fetchTimeout)
+ .consumerGroup(this.consumerGroup)
+ .useHttps(this.useHttps);
+
+ this.consumer = new BusConsumer.KafkaConsumerWrapper(builder
+ .additionalProps(this.additionalProps)
+ .build());
}
@Override
public static final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
public static final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
-
/* Topic Sink Values */
/**
import com.google.re2j.Pattern;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
- //TODO More Kafka properties to be added
+
return BusTopicParams.builder()
.servers(serverList)
.topic(topic)
public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
- public static final String SERVER = "my-server";
+ public static final String SERVER = "localhost:9092";
public static final String TOPIC2 = "my-topic-2";
@Getter
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+
+public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink> {
+
+ private SinkFactory factory;
+ public static final String KAFKA_SERVER = "localhost:9092";
+
+ /**
+ * Creates the object to be tested.
+ */
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ factory = new SinkFactory();
+ }
+
+ @After
+ public void tearDown() {
+ factory.destroy();
+ }
+
+ @Test
+ @Override
+ public void testBuildBusTopicParams() {
+ super.testBuildBusTopicParams();
+ super.testBuildBusTopicParams_Ex();
+ }
+
+ @Test
+ @Override
+ public void testBuildListOfStringString() {
+ super.testBuildListOfStringString();
+
+ // check parameters that were used
+ BusTopicParams params = getLastParams();
+ assertEquals(false, params.isAllowSelfSignedCerts());
+ }
+
+ @Test
+ @Override
+ public void testBuildProperties() {
+ List<KafkaTopicSink> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
+ assertEquals(1, topics.size());
+ assertEquals(MY_TOPIC, topics.get(0).getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+ BusTopicParams params = getLastParams();
+ assertEquals(true, params.isManaged());
+ assertEquals(false, params.isUseHttps());
+ assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
+ assertEquals(MY_PARTITION, params.getPartitionId());
+
+ List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
+ .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
+ assertEquals(1, topics2.size());
+ assertEquals(TOPIC3, topics2.get(0).getTopic());
+ assertEquals(topics2.get(0).getTopic(), topics2.get(0).getEffectiveTopic());
+
+ initFactory();
+
+ assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
+ }
+
+ @Test
+ @Override
+ public void testDestroyString_testGet_testInventory() {
+ super.testDestroyString_testGet_testInventory();
+ super.testDestroyString_Ex();
+ }
+
+ @Test
+ @Override
+ public void testDestroy() {
+ super.testDestroy();
+ }
+
+ @Test
+ public void testGet() {
+ super.testGet_Ex();
+ }
+
+ @Test
+ public void testToString() {
+ assertTrue(factory.toString().startsWith("IndexedKafkaTopicSinkFactory ["));
+ }
+
+ @Override
+ protected void initFactory() {
+ if (factory != null) {
+ factory.destroy();
+ }
+
+ factory = new SinkFactory();
+ }
+
+ @Override
+ protected List<KafkaTopicSink> buildTopics(Properties properties) {
+ return factory.build(properties);
+ }
+
+ @Override
+ protected KafkaTopicSink buildTopic(BusTopicParams params) {
+ return factory.build(params);
+ }
+
+ @Override
+ protected KafkaTopicSink buildTopic(List<String> servers, String topic) {
+ return factory.build(servers, topic);
+ }
+
+ @Override
+ protected void destroyFactory() {
+ factory.destroy();
+ }
+
+ @Override
+ protected void destroyTopic(String topic) {
+ factory.destroy(topic);
+ }
+
+ @Override
+ protected List<KafkaTopicSink> getInventory() {
+ return factory.inventory();
+ }
+
+ @Override
+ protected KafkaTopicSink getTopic(String topic) {
+ return factory.get(topic);
+ }
+
+ @Override
+ protected BusTopicParams getLastParams() {
+ return factory.params.getLast();
+ }
+
+ @Override
+ protected TopicPropertyBuilder makePropBuilder() {
+ return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SINK_TOPICS);
+ }
+
+ /**
+ * Factory that records the parameters of all of the sinks it creates.
+ */
+ private static class SinkFactory extends IndexedKafkaTopicSinkFactory {
+ private Deque<BusTopicParams> params = new LinkedList<>();
+
+ @Override
+ protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
+ params.add(busTopicParams);
+ return super.makeSink(busTopicParams);
+ }
+ }
+}
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
private SourceFactory factory;
+ public static final String KAFKA_SERVER = "localhost:9092";
+
/**
* Creates the object to be tested.
*/
factory.destroy();
}
- @Test
- @Override
- public void testBuildBusTopicParams() {
- super.testBuildBusTopicParams_Ex();
- }
-
@Test
@Override
public void testBuildProperties() {
List<KafkaTopicSource> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
assertEquals(1, topics.size());
assertEquals(MY_TOPIC, topics.get(0).getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+ BusTopicParams params = getLastParams();
+ assertEquals(true, params.isManaged());
+ assertEquals(false, params.isUseHttps());
+ assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
}
@Test
@Override
public void testDestroyString_testGet_testInventory() {
+ super.testDestroyString_testGet_testInventory();
super.testDestroyString_Ex();
}
+ @Test
+ @Override
+ public void testDestroy() {
+ super.testDestroy();
+ }
+
@Test
public void testGet() {
super.testGet_Ex();
public static final String ROUTE_PROP = "routeOffer";
public static final String MY_ROUTE = "my-route";
+ public static final String MY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ public static final int KAFKA_PORT = 9092;
/**
* Message used within exceptions that are expected.
*/
protected List<String> servers;
+ /**
+ * Servers to be added to the parameter builder.
+ */
+ protected List<String> kafkaServers;
+
/**
* Parameter builder used to build topic parameters.
*/
addProps.put("my-key-B", "my-value-B");
servers = Arrays.asList("svra", "svrb");
+ kafkaServers = Arrays.asList("localhost:9092", "10.1.2.3:9092");
builder = makeBuilder();
}
/**
* Makes a fully populated parameter builder.
- *
+ *
* @return a new parameter builder
*/
public TopicParamsBuilder makeBuilder() {
.fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT)
.longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER)
.password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC)
- .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME);
+ .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME)
+ .serializationProvider(MY_SERIALIZER);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeKafkaBuilder() {
+ addProps.clear();
+ String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule "
+ + "required username=abc password=abc serviceName=kafka;";
+ addProps.put("sasl.jaas.config", jaas);
+ addProps.put("sasl.mechanism", "SCRAM-SHA-512");
+ addProps.put("security.protocol", "SASL_PLAINTEXT");
+
+ return makeKafkaBuilder(addProps, kafkaServers);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @param addProps additional properties to be added to the builder
+ * @param servers servers to be added to the builder
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeKafkaBuilder(Map<String, String> addProps, List<String> servers) {
+
+ return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME)
+ .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV)
+ .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER)
+ .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC)
+ .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false);
}
}
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.collections4.IteratorUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Before;
import org.junit.Test;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
@Test
public void testKafkaConsumerWrapper() throws Exception {
// verify that different wrappers can be built
- assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
+ new KafkaConsumerWrapper(makeBuilder().topic(null).build());
+ }
+
+ @Test(expected = java.lang.IllegalStateException.class)
+ public void testKafkaConsumerWrapperFetch() throws Exception {
+
+ //Setup Properties for consumer
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
+ kafkaProps.setProperty("enable.auto.commit", "true");
+ kafkaProps.setProperty("auto.commit.interval.ms", "1000");
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
+ kafka.consumer = consumer;
+
+ assertFalse(kafka.fetch().iterator().hasNext());
+ consumer.close();
+ }
+
+ @Test
+ public void testKafkaConsumerWrapperClose() throws Exception {
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
}
@Test
public void testKafkaConsumerWrapperToString() throws Exception {
- assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString());
+ assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
}
private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
+import org.onap.policy.common.utils.gson.GsonTestUtils;
+
+public class InlineKafkaTopicSinkTest extends TopicTestBase {
+ private InlineKafkaTopicSink sink;
+
+ /**
+ * Creates the object to be tested.
+ */
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
+ }
+
+ @After
+ public void tearDown() {
+ sink.shutdown();
+ }
+
+ @Test
+ public void testToString() {
+ assertTrue(sink.toString().startsWith("InlineKafkaTopicSink ["));
+ }
+
+ @Test
+ public void testInit() {
+ // nothing null
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
+ sink.init();
+ assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testGetTopicCommInfrastructure() {
+ assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure());
+ }
+
+}
public void setUp() {
super.setUp();
- source = new SingleThreadedKafkaTopicSource(makeBuilder().build());
+ source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build());
}
@After
source.shutdown();
}
+ public void testSerialize() {
+ assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class))
+ .doesNotThrowAnyException();
+ }
+
@Test
public void testToString() {
assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource ["));
+ source.shutdown();
}
@Test
{
- "servers" : [ "svra", "svrb" ],
- "topic" : "my-topic",
- "effectiveTopic" : "my-effective-topic",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "useHttps" : true,
- "topicCommInfrastructure" : "KAFKA",
- "partitionKey" : "my-partition"
+ "servers": [
+ "svra",
+ "svrb"
+ ],
+ "topic": "my-topic",
+ "effectiveTopic": "my-effective-topic",
+ "recentEvents": [],
+ "alive": false,
+ "locked": false,
+ "useHttps": false,
+ "topicCommInfrastructure": "KAFKA",
+ "partitionKey": "my-partition",
+ "additionalProps": {
+ "security.protocol": "SASL_PLAINTEXT",
+ "sasl.mechanism": "SCRAM-SHA-512",
+ "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=abc password=abc serviceName=kafka;"
+ }
}
{
- "servers" : [ "svra", "svrb" ],
- "topic" : "my-topic",
- "effectiveTopic" : "my-effective-topic",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "useHttps" : true,
- "topicCommInfrastructure" : "KAFKA"
+ "servers": [
+ "localhost:9092",
+ "10.1.2.3:9092"
+ ],
+ "topic": "my-topic",
+ "effectiveTopic": "my-effective-topic",
+ "recentEvents": [],
+ "alive": false,
+ "locked": false,
+ "useHttps": false,
+ "topicCommInfrastructure": "KAFKA",
+ "additionalProps": {
+ "security.protocol": "SASL_PLAINTEXT",
+ "sasl.mechanism": "SCRAM-SHA-512",
+ "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=abc password=abc serviceName=kafka;"
+ }
}