[KAFKA] Add docker-compose to sample project 87/133987/1 master
authorefiacor <fiachra.corcoran@est.tech>
Wed, 29 Mar 2023 11:52:11 +0000 (12:52 +0100)
committerefiacor <fiachra.corcoran@est.tech>
Mon, 3 Apr 2023 08:48:00 +0000 (09:48 +0100)
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: Iaa3c2ff9b60aa5a691242deec82911a700222d9a
Issue-ID: DMAAP-1847

kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java
kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java
sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java
sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java
sampleClient/src/main/resources/application.yaml
sampleClient/src/main/resources/docker-compose/config.properties [new file with mode: 0644]
sampleClient/src/main/resources/docker-compose/kafka.jaas.conf [new file with mode: 0644]
sampleClient/src/main/resources/docker-compose/runner.sh [new file with mode: 0755]
sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml [new file with mode: 0644]
sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config [new file with mode: 0644]

index 7986869..71fcac0 100644 (file)
@@ -70,7 +70,7 @@ public class OnapKafkaClient {
      * Publish data to a given topic
      *  @param topicName The topic to which the message should be published
      * @param data      The data to publish to the topic specified
-     * @return
+     * @return The RecordMetedata of the request
      */
     public RecordMetadata publishToTopic(String topicName, String data) {
         // Should we check the data size and chunk it if necessary? Do we need to?
index e08e229..c6e312d 100644 (file)
@@ -56,7 +56,7 @@ public class OnapKafkaConsumer {
         Properties props = new Properties();
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID());
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID());
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig());
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getKafkaBootstrapServers());
         props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig());
index 37a3097..c80c87f 100644 (file)
 
 package org.onap.dmaap.kafka.sample;
 
+import java.util.List;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.onap.dmaap.kafka.OnapKafkaClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
@@ -29,6 +33,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 @SpringBootApplication
 public class Main implements CommandLineRunner{
 
+    private final Logger log = LoggerFactory.getLogger(OnapKafkaClient.class.getName());
+
     @Autowired
     private SampleConfiguration configuration;
 
@@ -37,8 +43,22 @@ public class Main implements CommandLineRunner{
     }
 
     @Override
-    public void run(String... args) {
+    public void run(String... args) throws InterruptedException {
         OnapKafkaClient handler = new OnapKafkaClient(configuration);
-        handler.fetchFromTopic("dummy.topic.blah");
+        String testTopic = configuration.getConsumerTopics().get(0);
+        for (int i = 0; i < 5; i++) {
+            RecordMetadata recordMetadata = handler.publishToTopic(testTopic, "dummy-message-"+i);
+            if (recordMetadata != null) {
+                log.info("Topic: {}, Partition: {}, Offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
+            }
+        }
+        int fetch = 0;
+        while (true) {
+            fetch++;
+            log.info("Fetch {} from topic: {}", fetch, testTopic);
+            List<String> res = handler.fetchFromTopic(testTopic);
+            log.info("Messages from fetch {}: " + res, fetch);
+            Thread.sleep(3000);
+        }
     }
 }
\ No newline at end of file
index 0cb5498..601504a 100644 (file)
@@ -44,5 +44,4 @@ public class SampleConfiguration implements IKafkaConfig {
 
     private List<String> producerTopics;
     private String kafkaSaslJaasConfig;
-    // private String kafkaSaslMechanism;
 }
index b8a0f70..c592fe4 100644 (file)
@@ -1,11 +1,8 @@
 kafka:
   kafkaBootstrapServers: [localhost:9092]
   pollingTimeout: 10
-  consumerGroup: my-consumer-group
-  consumerID: my-consumer-id
-  consumerTopics: [test.mytopic.1, test.mytopic.2]
-  producerTopics: [test.mytopic.3]
-  kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;}
-
-  #kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;}
-  #kafkaSaslMechanism: ${SASL_MECHANISM:PLAIN}
\ No newline at end of file
+  consumerGroup: test-consumer-group
+  consumerID: test-id
+  consumerTopics: [test-topic.1]
+  producerTopics: [test-topic.1]
+  kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="client-secret";}
diff --git a/sampleClient/src/main/resources/docker-compose/config.properties b/sampleClient/src/main/resources/docker-compose/config.properties
new file mode 100644 (file)
index 0000000..7b8734e
--- /dev/null
@@ -0,0 +1,3 @@
+sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="client-secret";
+security.protocol=SASL_PLAINTEXT
+sasl.mechanism=SCRAM-SHA-512
diff --git a/sampleClient/src/main/resources/docker-compose/kafka.jaas.conf b/sampleClient/src/main/resources/docker-compose/kafka.jaas.conf
new file mode 100644 (file)
index 0000000..1124681
--- /dev/null
@@ -0,0 +1,13 @@
+KafkaServer {
+   org.apache.kafka.common.security.scram.ScramLoginModule required
+   username="broker"
+   password="broker"
+   user_broker="broker"
+   user_client="client-secret";
+};
+
+Client {
+   org.apache.zookeeper.server.auth.DigestLoginModule required
+   username="kafka"
+   password="kafka";
+};
diff --git a/sampleClient/src/main/resources/docker-compose/runner.sh b/sampleClient/src/main/resources/docker-compose/runner.sh
new file mode 100755 (executable)
index 0000000..2a188b1
--- /dev/null
@@ -0,0 +1,65 @@
+#!/bin/bash
+
+function start {
+  docker compose -f scram-docker-compose.yml up -d
+
+  until [ "$(docker inspect -f {{.State.Running}} broker)" == "true" ]; do
+      sleep 1;
+  done;
+
+  echo -e "\n Creating kafka users"
+  docker exec broker kafka-configs --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=broker],SCRAM-SHA-512=[password=broker]' --entity-type users --entity-name broker
+  docker exec broker kafka-configs --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=client-secret],SCRAM-SHA-512=[password=client-secret]' --entity-type users --entity-name client
+
+  echo -e "\n Creating test topic"
+  docker exec broker kafka-topics --create --bootstrap-server broker:9092 --replication-factor 1 --partitions 1 --topic test-topic.1 --command-config config.properties
+
+  echo -e "\n Listing existing topics"
+  docker exec broker kafka-topics --list --bootstrap-server localhost:9092 --command-config config.properties
+
+  echo -e "\n Adding broker to /etc/hosts"
+  echo '127.0.0.1 broker' | sudo tee -a /etc/hosts
+}
+
+
+function stop {
+
+  docker compose -f scram-docker-compose.yml down
+
+  sudo sed -i.bak '/broker/d' /etc/hosts
+}
+
+function publisher {
+    docker exec -it broker kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic.1 --producer.config config.properties
+}
+
+showHelp() {
+cat << EOF
+Usage: ./runner.sh [start|stop]
+
+start
+
+stop
+
+EOF
+}
+
+while true
+do
+case "$1" in
+start)
+    start
+    ;;
+pub)
+    publisher
+    ;;
+stop)
+    stop
+    ;;
+*)
+    showHelp
+    shift
+    break;;
+esac
+shift
+done
\ No newline at end of file
diff --git a/sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml b/sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml
new file mode 100644 (file)
index 0000000..562ad97
--- /dev/null
@@ -0,0 +1,49 @@
+version: '3.5'
+services:
+  zookeeper:
+    image: confluentinc/cp-zookeeper:7.3.2
+    hostname: zookeeper
+    container_name: zookeeper
+    restart: always
+    ports:
+      - "9999:9999"
+    volumes:
+      - ./zookeeper.sasl.jaas.config:/etc/kafka/zookeeper_server_jaas.conf
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      ZOOKEEPER_TICK_TIME: 2000
+      KAFKA_JMX_HOSTNAME: localhost
+      KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
+        -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
+        -Dzookeeper.allowSaslFailedClients=false
+        -Dzookeeper.requireClientAuthScheme=sasl
+
+  broker:
+    image: confluentinc/cp-server:7.3.2
+    hostname: broker
+    container_name: broker
+    restart: always
+    ports:
+      - "9092:9092"
+    volumes:
+      - ./kafka.jaas.conf:/etc/kafka/kafka_server_jaas.conf
+      - ./config.properties:/home/appuser/config.properties
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+      KAFKA_LISTENERS: SASL_PLAINTEXT://:9092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_PLAINTEXT:SASL_PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://broker:9092
+      KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512
+      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
+      KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
+      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
+      CONFLUENT_METRICS_ENABLE: 'false'
+      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
+
diff --git a/sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config b/sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config
new file mode 100644 (file)
index 0000000..9575461
--- /dev/null
@@ -0,0 +1,4 @@
+Server {
+   org.apache.zookeeper.server.auth.DigestLoginModule required
+   user_kafka="kafka";
+};