Add kafka docker container for policy CSITs 47/136847/1
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>
Thu, 14 Dec 2023 14:17:35 +0000 (14:17 +0000)
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>
Thu, 14 Dec 2023 14:17:35 +0000 (14:17 +0000)
Replaced dmaap  with kafka in CLAMP docker tests.

Issue-ID: POLICY-4201
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Change-Id: I4d05e24d3ececf2253ebc39785882be00bf9eaf4

13 files changed:
compose/config/clamp/A1pmsParticipantParameters.yaml
compose/config/clamp/AcRuntimeParameters.yaml
compose/config/clamp/HttpParticipantParameters.yaml
compose/config/clamp/KserveParticipantParameters.yaml
compose/config/clamp/KubernetesParticipantParameters.yaml
compose/config/clamp/PolicyParticipantParameters.yaml
compose/config/clamp/SimulatorParticipantParameters.yaml
compose/docker-compose.yml
csit/resources/scripts/kafka_consumer.py [new file with mode: 0755]
csit/resources/scripts/kafka_producer.py [new file with mode: 0755]
csit/resources/scripts/make_topics.py [new file with mode: 0755]
csit/resources/scripts/prepare-robot-env.sh
csit/resources/tests/common-library.robot

index f905484..44902d8 100755 (executable)
@@ -28,16 +28,16 @@ participant:
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c00
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: POLICY-ACRUNTIME-PARTICIPANT
+        - topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           fetchTimeout: 15000
       topicSinks:
-        - topic: POLICY-ACRUNTIME-PARTICIPANT
+        - topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
 
 
 management:
index dc9a984..b800e4b 100644 (file)
@@ -46,18 +46,18 @@ runtime:
   topicParameterGroup:
     topicSources:
       -
-        topic: POLICY-ACRUNTIME-PARTICIPANT
+        topic: policy-acruntime-participant
         servers:
-          - ${topicServer:message-router}
-        topicCommInfrastructure: dmaap
+          - ${topicServer:kafka:9092}
+        topicCommInfrastructure: kafka
         fetchTimeout: 15000
         useHttps: false
     topicSinks:
       -
-        topic: POLICY-ACRUNTIME-PARTICIPANT
+        topic: policy-acruntime-participant
         servers:
-          - ${topicServer:message-router}
-        topicCommInfrastructure: dmaap
+          - ${topicServer:kafka:9092}
+        topicCommInfrastructure: kafka
         useHttps: false
   acmParameters:
     toscaElementName: org.onap.policy.clamp.acm.AutomationCompositionElement
index 878eed7..a26a81d 100644 (file)
@@ -17,17 +17,17 @@ participant:
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: POLICY-ACRUNTIME-PARTICIPANT
+        - topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           fetchTimeout: 15000
           useHttps: false
       topicSinks:
-        - topic: POLICY-ACRUNTIME-PARTICIPANT
+        - topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           useHttps: false
     participantSupportedElementTypes:
       - typeName: org.onap.policy.clamp.acm.HttpAutomationCompositionElement
index d83a48f..fe55f54 100755 (executable)
@@ -27,16 +27,16 @@ participant:
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c04\r
     clampAutomationCompositionTopics:\r
       topicSources:\r
-        - topic: POLICY-ACRUNTIME-PARTICIPANT\r
+        - topic: policy-acruntime-participant\r
           servers:\r
-            - ${topicServer:message-router}\r
-          topicCommInfrastructure: dmaap\r
+            - ${topicServer:kafka:9092}\r
+          topicCommInfrastructure: kafka\r
           fetchTimeout: 15000\r
       topicSinks:\r
-        - topic: POLICY-ACRUNTIME-PARTICIPANT\r
+        - topic: policy-acruntime-participant\r
           servers:\r
-            - ${topicServer:message-router}\r
-          topicCommInfrastructure: dmaap\r
+            - ${topicServer:kafka:9092}\r
+          topicCommInfrastructure: kafka\r
     participantSupportedElementTypes:\r
       -\r
         typeName: org.onap.policy.clamp.acm.KserveAutomationCompositionElement\r
index 3449a5d..449e961 100644 (file)
@@ -21,18 +21,18 @@ participant:
     clampAutomationCompositionTopics:
       topicSources:
         -
-          topic: POLICY-ACRUNTIME-PARTICIPANT
+          topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           fetchTimeout: 15000
           useHttps: false
       topicSinks:
         -
-          topic: POLICY-ACRUNTIME-PARTICIPANT
+          topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           useHttps: false
     participantSupportedElementTypes:
       - typeName: org.onap.policy.clamp.acm.K8SMicroserviceAutomationCompositionElement
index 443a0ca..307e0b6 100644 (file)
@@ -34,18 +34,18 @@ participant:
     clampAutomationCompositionTopics:
       topicSources:
         -
-          topic: POLICY-ACRUNTIME-PARTICIPANT
+          topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           fetchTimeout: 15000
           useHttps: false
       topicSinks:
         -
-          topic: POLICY-ACRUNTIME-PARTICIPANT
+          topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           useHttps: false
     participantSupportedElementTypes:
       - typeName: org.onap.policy.clamp.acm.PolicyAutomationCompositionElement
index 6ec594f..16234e1 100644 (file)
@@ -19,17 +19,17 @@ participant:
     participantId: ${participantId:101c62b3-8918-41b9-a747-d21eb79c6c90}
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: POLICY-ACRUNTIME-PARTICIPANT
+        - topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           fetchTimeout: 15000
           useHttps: false
       topicSinks:
-        - topic: POLICY-ACRUNTIME-PARTICIPANT
+        - topic: policy-acruntime-participant
           servers:
-            - ${topicServer:message-router}
-          topicCommInfrastructure: dmaap
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: kafka
           useHttps: false
     participantSupportedElementTypes:
       -
index f57dac8..3814f21 100644 (file)
@@ -241,7 +241,7 @@ services:
       container_name: policy-clamp-runtime-acm
       depends_on:
        - mariadb
-       - simulator
+       - kafka
        - policy-clamp-ac-http-ppnt
        - policy-clamp-ac-k8s-ppnt
        - policy-clamp-ac-pf-ppnt
@@ -258,7 +258,7 @@ services:
       command: [
         '-c', './acm-runtime.sh',
         'mariadb', '3306',
-        'message-router', '3904',
+        'kafka', '9092',
         'policy-clamp-ac-http-ppnt', '6969',
         'policy-clamp-ac-k8s-ppnt', '6969',
         'policy-clamp-ac-pf-ppnt', '6969',
@@ -269,7 +269,7 @@ services:
       image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-http-ppnt:${POLICY_CLAMP_VERSION}
       container_name: policy-clamp-ac-http-ppnt
       depends_on:
-       - simulator
+       - kafka
       hostname: policy-clamp-ac-http-ppnt
       ports:
        - "30290:6969"
@@ -280,13 +280,13 @@ services:
       entrypoint: /opt/app/policy/bin/wait_for_port.sh
       command: [
         '-c', './http-participant.sh',
-        'message-router', '3904'
+        'kafka', '9092'
         ]
    policy-clamp-ac-k8s-ppnt:
       image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-k8s-ppnt:${POLICY_CLAMP_VERSION}
       container_name: policy-clamp-ac-k8s-ppnt
       depends_on:
-       - simulator
+       - kafka
       hostname: policy-clamp-ac-k8s-ppnt
       ports:
        - "30295:6969"
@@ -297,13 +297,13 @@ services:
       entrypoint: /opt/app/policy/bin/wait_for_port.sh
       command: [
         '-c', './kubernetes-participant.sh',
-        'message-router', '3904'
+        'kafka', '9092'
         ]
    policy-clamp-ac-pf-ppnt:
       image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-pf-ppnt:${POLICY_CLAMP_VERSION}
       container_name: policy-clamp-ac-pf-ppnt
       depends_on:
-       - simulator
+       - kafka
        - api
       hostname: policy-clamp-ac-pf-ppnt
       ports:
@@ -315,14 +315,14 @@ services:
       entrypoint: /opt/app/policy/bin/wait_for_port.sh
       command: [
         '-c', './policy-participant.sh',
-        'message-router', '3904',
+        'kafka', '9092',
         'api', '6969'
         ]
    policy-clamp-ac-a1pms-ppnt:
       image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-a1pms-ppnt:${POLICY_CLAMP_VERSION}
       container_name: policy-clamp-ac-a1pms-ppnt
       depends_on:
-       - simulator
+       - kafka
       hostname: policy-clamp-ac-a1pms-ppnt
       ports:
        - "30296:6969"
@@ -333,13 +333,13 @@ services:
       entrypoint: /opt/app/policy/bin/wait_for_port.sh
       command: [
         '-c', './a1pms-participant.sh',
-        'message-router', '3904'
+        'kafka', '9092'
         ]
    policy-clamp-ac-kserve-ppnt:
       image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-kserve-ppnt:${POLICY_CLAMP_VERSION}
       container_name: policy-clamp-ac-kserve-ppnt
       depends_on:
-       - simulator
+       - kafka
       hostname: policy-clamp-ac-kserve-ppnt
       ports:
        - "30297:6969"
@@ -350,13 +350,13 @@ services:
       entrypoint: /opt/app/policy/bin/wait_for_port.sh
       command: [
         '-c', './kserve-participant.sh',
-        'message-router', '3904'
+        'kafka', '9092'
         ]
    policy-clamp-ac-sim-ppnt:
       image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-sim-ppnt:${POLICY_CLAMP_VERSION}
       container_name: policy-clamp-ac-sim-ppnt
       depends_on:
-        - simulator
+        - kafka
       hostname: policy-clamp-ac-sim-ppnt
       ports:
         - ${SIM_PARTICIPANT_PORT}:6969
@@ -367,7 +367,7 @@ services:
       entrypoint: /opt/app/policy/bin/wait_for_port.sh
       command: [
         '-c', './sim-participant.sh',
-        'message-router', '3904'
+        'kafka', '9092'
       ]
    prometheus:
       image: nexus3.onap.org:10001/prom/prometheus:latest
@@ -389,3 +389,27 @@ services:
        - ./metrics/dashboard.yaml:/etc/grafana/provisioning/dashboards/dashboard.yaml
        - ./metrics/datasource.yaml:/etc/grafana/provisioning/datasources/datasource.yaml
        - ./metrics/dashboards:/var/lib/grafana/dashboards
+
+   zookeeper:
+     image: confluentinc/cp-zookeeper:latest
+     environment:
+       ZOOKEEPER_CLIENT_PORT: 2181
+       ZOOKEEPER_TICK_TIME: 2000
+     ports:
+       - 2181:2181
+
+   kafka:
+     image: confluentinc/cp-kafka:latest
+     container_name: kafka
+     depends_on:
+       - zookeeper
+     ports:
+       - 29092:29092
+       - 9092:9092
+     environment:
+       KAFKA_BROKER_ID: 1
+       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
+       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
\ No newline at end of file
diff --git a/csit/resources/scripts/kafka_consumer.py b/csit/resources/scripts/kafka_consumer.py
new file mode 100755 (executable)
index 0000000..80b6167
--- /dev/null
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+#  Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to fetch kafka topic and look for required messages.
+# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic.
+
+
+from confluent_kafka import Consumer, KafkaException
+import sys
+import time
+
+def consume_kafka_topic(topic, expected_values, timeout):
+    config = {
+            'bootstrap.servers': 'localhost:29092',
+            'group.id': 'testgrp',
+            'auto.offset.reset': 'earliest'
+    }
+    consumer = Consumer(config)
+    consumer.subscribe([topic])
+    try:
+        start_time = time.time()
+        while time.time() - start_time < timeout:
+                msg = consumer.poll(1.0)
+                if msg is None:
+                    continue
+                if msg.error():
+                    if msg.error().code() == KafkaException._PARTITION_EOF:
+                        sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
+                        print('ERROR')
+                        sys.exit(404)
+                    else:
+                        # Error
+                        raise KafkaException(msg.error())
+                else:
+                    # Message received
+                    message = msg.value().decode('utf-8')
+                    if verify_msg(expected_values, message):
+                        print(message)
+                        sys.exit(200)
+    finally:
+        consumer.close()
+
+def verify_msg(expected_values, message):
+    for item in expected_values:
+        if item not in message:
+            return False
+    return True
+
+
+if __name__ == '__main__':
+    topic_name = sys.argv[1]
+    timeout = sys.argv[2]  # timeout in seconds for verifying the kafka topic
+    expected_values = sys.argv[3:]
+    consume_kafka_topic(topic_name, expected_values, timeout)
diff --git a/csit/resources/scripts/kafka_producer.py b/csit/resources/scripts/kafka_producer.py
new file mode 100755 (executable)
index 0000000..ff12987
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+#  Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to produce a message on a kafka topic
+# Accepts the arguments {topic_name} and {message}
+
+from confluent_kafka import Producer
+import sys
+
+def post_to_kafka(topic, message):
+    conf = {'bootstrap.servers': 'localhost:29092'}
+
+    producer = Producer(conf)
+    try:
+        producer.produce(topic, value=message.encode('utf-8'))
+        producer.flush()
+        print('Message posted to Kafka topic: {}'.format(topic))
+    except Exception as e:
+        print('Failed to post message: {}'.format(str(e)))
+    finally:
+        producer.flush()
+
+if __name__ == '__main__':
+    post_to_kafka(sys.argv[1], sys.argv[2])
diff --git a/csit/resources/scripts/make_topics.py b/csit/resources/scripts/make_topics.py
new file mode 100755 (executable)
index 0000000..daee434
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+#  Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to create a new kafka topic
+# Accepts the argument {topic_name}
+
+from confluent_kafka.admin import AdminClient, NewTopic
+import sys
+
+def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2):
+    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
+
+    # Define the topic configuration
+    topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
+
+    # Create the topic
+    admin_client.create_topics([topic])
+
+
+if __name__ == '__main__':
+    topic_name = sys.argv[1]
+    bootstrap_servers = 'localhost:29092'
+
+    create_topic(bootstrap_servers, topic_name)
index 2281d23..2b77380 100755 (executable)
@@ -44,6 +44,10 @@ mkdir -p "${ROBOT_VENV}"/src/onap
 rm -rf "${ROBOT_VENV}"/src/onap/testsuite
 python3 -m pip install -qq --upgrade --extra-index-url="https://nexus3.onap.org/repository/PyPi.staging/simple" 'robotframework-onap==0.6.0.*' --pre
 
+# install confluent-kafka
+echo "Installing python confluent-kafka library"
+python3 -m pip install -qq confluent-kafka
+
 echo "Uninstall docker-py and reinstall docker."
 python3 -m pip uninstall -y -qq docker
 python3 -m pip install -U -qq docker
index 8c27917..f5db8e0 100644 (file)
@@ -147,3 +147,13 @@ CheckTopic
     Status Should Be    OK    ${resp}
     Should Contain    ${resp.text}    ${expected_status}
     [Return]    ${resp.text}
+
+CheckKafkaTopic
+    [Arguments]    ${topic}    ${expected_status}
+    ${resp}=    Run Process    ${CURDIR}/kafka_consumer.py    ${topic}    30    ${expected_status}
+    Log to console  Received response from kafka ${resp.stdout}
+    Should Contain    ${resp.text}    ${expected_status}
+
+GetKafkaTopic
+    [Arguments]    ${topic}
+    ${resp}=    Run Process    ${CURDIR}/make_topics.py    ${topic}
\ No newline at end of file