From c3386ebc4fd444e810d8739f7ddd78765d36c631 Mon Sep 17 00:00:00 2001 From: efiacor Date: Wed, 29 Mar 2023 12:52:11 +0100 Subject: [PATCH] [KAFKA] Add docker-compose to sample project Signed-off-by: efiacor Change-Id: Iaa3c2ff9b60aa5a691242deec82911a700222d9a Issue-ID: DMAAP-1847 --- .../java/org/onap/dmaap/kafka/OnapKafkaClient.java | 2 +- .../org/onap/dmaap/kafka/OnapKafkaConsumer.java | 2 +- .../java/org/onap/dmaap/kafka/sample/Main.java | 24 +++++++- .../dmaap/kafka/sample/SampleConfiguration.java | 1 - sampleClient/src/main/resources/application.yaml | 13 ++--- .../resources/docker-compose/config.properties | 3 + .../main/resources/docker-compose/kafka.jaas.conf | 13 +++++ .../src/main/resources/docker-compose/runner.sh | 65 ++++++++++++++++++++++ .../docker-compose/scram-docker-compose.yml | 49 ++++++++++++++++ .../docker-compose/zookeeper.sasl.jaas.config | 4 ++ 10 files changed, 163 insertions(+), 13 deletions(-) create mode 100644 sampleClient/src/main/resources/docker-compose/config.properties create mode 100644 sampleClient/src/main/resources/docker-compose/kafka.jaas.conf create mode 100755 sampleClient/src/main/resources/docker-compose/runner.sh create mode 100644 sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml create mode 100644 sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config diff --git a/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java b/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java index 7986869..71fcac0 100644 --- a/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java +++ b/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java @@ -70,7 +70,7 @@ public class OnapKafkaClient { * Publish data to a given topic * @param topicName The topic to which the message should be published * @param data The data to publish to the topic specified - * @return + * @return The RecordMetedata of the request */ public RecordMetadata publishToTopic(String topicName, String data) { // Should we check the data size and chunk it if necessary? Do we need to? diff --git a/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java b/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java index e08e229..c6e312d 100644 --- a/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java +++ b/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java @@ -56,7 +56,7 @@ public class OnapKafkaConsumer { Properties props = new Properties(); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID()); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig()); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getKafkaBootstrapServers()); props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig()); diff --git a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java index 37a3097..c80c87f 100644 --- a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java +++ b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java @@ -20,7 +20,11 @@ package org.onap.dmaap.kafka.sample; +import java.util.List; +import org.apache.kafka.clients.producer.RecordMetadata; import org.onap.dmaap.kafka.OnapKafkaClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -29,6 +33,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main implements CommandLineRunner{ + private final Logger log = LoggerFactory.getLogger(OnapKafkaClient.class.getName()); + @Autowired private SampleConfiguration configuration; @@ -37,8 +43,22 @@ public class Main implements CommandLineRunner{ } @Override - public void run(String... args) { + public void run(String... args) throws InterruptedException { OnapKafkaClient handler = new OnapKafkaClient(configuration); - handler.fetchFromTopic("dummy.topic.blah"); + String testTopic = configuration.getConsumerTopics().get(0); + for (int i = 0; i < 5; i++) { + RecordMetadata recordMetadata = handler.publishToTopic(testTopic, "dummy-message-"+i); + if (recordMetadata != null) { + log.info("Topic: {}, Partition: {}, Offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); + } + } + int fetch = 0; + while (true) { + fetch++; + log.info("Fetch {} from topic: {}", fetch, testTopic); + List res = handler.fetchFromTopic(testTopic); + log.info("Messages from fetch {}: " + res, fetch); + Thread.sleep(3000); + } } } \ No newline at end of file diff --git a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java index 0cb5498..601504a 100644 --- a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java +++ b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java @@ -44,5 +44,4 @@ public class SampleConfiguration implements IKafkaConfig { private List producerTopics; private String kafkaSaslJaasConfig; - // private String kafkaSaslMechanism; } diff --git a/sampleClient/src/main/resources/application.yaml b/sampleClient/src/main/resources/application.yaml index b8a0f70..c592fe4 100644 --- a/sampleClient/src/main/resources/application.yaml +++ b/sampleClient/src/main/resources/application.yaml @@ -1,11 +1,8 @@ kafka: kafkaBootstrapServers: [localhost:9092] pollingTimeout: 10 - consumerGroup: my-consumer-group - consumerID: my-consumer-id - consumerTopics: [test.mytopic.1, test.mytopic.2] - producerTopics: [test.mytopic.3] - kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;} - - #kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;} - #kafkaSaslMechanism: ${SASL_MECHANISM:PLAIN} \ No newline at end of file + consumerGroup: test-consumer-group + consumerID: test-id + consumerTopics: [test-topic.1] + producerTopics: [test-topic.1] + kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="client-secret";} diff --git a/sampleClient/src/main/resources/docker-compose/config.properties b/sampleClient/src/main/resources/docker-compose/config.properties new file mode 100644 index 0000000..7b8734e --- /dev/null +++ b/sampleClient/src/main/resources/docker-compose/config.properties @@ -0,0 +1,3 @@ +sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="client-secret"; +security.protocol=SASL_PLAINTEXT +sasl.mechanism=SCRAM-SHA-512 diff --git a/sampleClient/src/main/resources/docker-compose/kafka.jaas.conf b/sampleClient/src/main/resources/docker-compose/kafka.jaas.conf new file mode 100644 index 0000000..1124681 --- /dev/null +++ b/sampleClient/src/main/resources/docker-compose/kafka.jaas.conf @@ -0,0 +1,13 @@ +KafkaServer { + org.apache.kafka.common.security.scram.ScramLoginModule required + username="broker" + password="broker" + user_broker="broker" + user_client="client-secret"; +}; + +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="kafka" + password="kafka"; +}; diff --git a/sampleClient/src/main/resources/docker-compose/runner.sh b/sampleClient/src/main/resources/docker-compose/runner.sh new file mode 100755 index 0000000..2a188b1 --- /dev/null +++ b/sampleClient/src/main/resources/docker-compose/runner.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +function start { + docker compose -f scram-docker-compose.yml up -d + + until [ "$(docker inspect -f {{.State.Running}} broker)" == "true" ]; do + sleep 1; + done; + + echo -e "\n Creating kafka users" + docker exec broker kafka-configs --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=broker],SCRAM-SHA-512=[password=broker]' --entity-type users --entity-name broker + docker exec broker kafka-configs --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=client-secret],SCRAM-SHA-512=[password=client-secret]' --entity-type users --entity-name client + + echo -e "\n Creating test topic" + docker exec broker kafka-topics --create --bootstrap-server broker:9092 --replication-factor 1 --partitions 1 --topic test-topic.1 --command-config config.properties + + echo -e "\n Listing existing topics" + docker exec broker kafka-topics --list --bootstrap-server localhost:9092 --command-config config.properties + + echo -e "\n Adding broker to /etc/hosts" + echo '127.0.0.1 broker' | sudo tee -a /etc/hosts +} + + +function stop { + + docker compose -f scram-docker-compose.yml down + + sudo sed -i.bak '/broker/d' /etc/hosts +} + +function publisher { + docker exec -it broker kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic.1 --producer.config config.properties +} + +showHelp() { +cat << EOF +Usage: ./runner.sh [start|stop] + +start + +stop + +EOF +} + +while true +do +case "$1" in +start) + start + ;; +pub) + publisher + ;; +stop) + stop + ;; +*) + showHelp + shift + break;; +esac +shift +done \ No newline at end of file diff --git a/sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml b/sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml new file mode 100644 index 0000000..562ad97 --- /dev/null +++ b/sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml @@ -0,0 +1,49 @@ +version: '3.5' +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.3.2 + hostname: zookeeper + container_name: zookeeper + restart: always + ports: + - "9999:9999" + volumes: + - ./zookeeper.sasl.jaas.config:/etc/kafka/zookeeper_server_jaas.conf + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf + -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider + -Dzookeeper.allowSaslFailedClients=false + -Dzookeeper.requireClientAuthScheme=sasl + + broker: + image: confluentinc/cp-server:7.3.2 + hostname: broker + container_name: broker + restart: always + ports: + - "9092:9092" + volumes: + - ./kafka.jaas.conf:/etc/kafka/kafka_server_jaas.conf + - ./config.properties:/home/appuser/config.properties + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENERS: SASL_PLAINTEXT://:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_PLAINTEXT:SASL_PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://broker:9092 + KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512 + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 + KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT + CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 + CONFLUENT_METRICS_ENABLE: 'false' + KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf" + diff --git a/sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config b/sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config new file mode 100644 index 0000000..9575461 --- /dev/null +++ b/sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config @@ -0,0 +1,4 @@ +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + user_kafka="kafka"; +}; -- 2.16.6