From: rameshiyer27 Date: Thu, 14 Dec 2023 14:17:35 +0000 (+0000) Subject: Add kafka docker container for policy CSITs X-Git-Tag: 3.1.1~5^2 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=f2609a349565f74237024f2f546e03ea5c772cb5;p=policy%2Fdocker.git Add kafka docker container for policy CSITs Replaced dmaap with kafka in CLAMP docker tests. Issue-ID: POLICY-4201 Signed-off-by: zrrmmua Change-Id: I4d05e24d3ececf2253ebc39785882be00bf9eaf4 --- diff --git a/compose/config/clamp/A1pmsParticipantParameters.yaml b/compose/config/clamp/A1pmsParticipantParameters.yaml index f905484a..44902d89 100755 --- a/compose/config/clamp/A1pmsParticipantParameters.yaml +++ b/compose/config/clamp/A1pmsParticipantParameters.yaml @@ -28,16 +28,16 @@ participant: participantId: 101c62b3-8918-41b9-a747-d21eb79c6c00 clampAutomationCompositionTopics: topicSources: - - topic: POLICY-ACRUNTIME-PARTICIPANT + - topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka fetchTimeout: 15000 topicSinks: - - topic: POLICY-ACRUNTIME-PARTICIPANT + - topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka management: diff --git a/compose/config/clamp/AcRuntimeParameters.yaml b/compose/config/clamp/AcRuntimeParameters.yaml index dc9a9846..b800e4b0 100644 --- a/compose/config/clamp/AcRuntimeParameters.yaml +++ b/compose/config/clamp/AcRuntimeParameters.yaml @@ -46,18 +46,18 @@ runtime: topicParameterGroup: topicSources: - - topic: POLICY-ACRUNTIME-PARTICIPANT + topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka fetchTimeout: 15000 useHttps: false topicSinks: - - topic: POLICY-ACRUNTIME-PARTICIPANT + topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka useHttps: false acmParameters: toscaElementName: org.onap.policy.clamp.acm.AutomationCompositionElement diff --git a/compose/config/clamp/HttpParticipantParameters.yaml b/compose/config/clamp/HttpParticipantParameters.yaml index 878eed75..a26a81d0 100644 --- a/compose/config/clamp/HttpParticipantParameters.yaml +++ b/compose/config/clamp/HttpParticipantParameters.yaml @@ -17,17 +17,17 @@ participant: participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01 clampAutomationCompositionTopics: topicSources: - - topic: POLICY-ACRUNTIME-PARTICIPANT + - topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka fetchTimeout: 15000 useHttps: false topicSinks: - - topic: POLICY-ACRUNTIME-PARTICIPANT + - topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka useHttps: false participantSupportedElementTypes: - typeName: org.onap.policy.clamp.acm.HttpAutomationCompositionElement diff --git a/compose/config/clamp/KserveParticipantParameters.yaml b/compose/config/clamp/KserveParticipantParameters.yaml index d83a48f0..fe55f543 100755 --- a/compose/config/clamp/KserveParticipantParameters.yaml +++ b/compose/config/clamp/KserveParticipantParameters.yaml @@ -27,16 +27,16 @@ participant: participantId: 101c62b3-8918-41b9-a747-d21eb79c6c04 clampAutomationCompositionTopics: topicSources: - - topic: POLICY-ACRUNTIME-PARTICIPANT + - topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka fetchTimeout: 15000 topicSinks: - - topic: POLICY-ACRUNTIME-PARTICIPANT + - topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka participantSupportedElementTypes: - typeName: org.onap.policy.clamp.acm.KserveAutomationCompositionElement diff --git a/compose/config/clamp/KubernetesParticipantParameters.yaml b/compose/config/clamp/KubernetesParticipantParameters.yaml index 3449a5d1..449e9617 100644 --- a/compose/config/clamp/KubernetesParticipantParameters.yaml +++ b/compose/config/clamp/KubernetesParticipantParameters.yaml @@ -21,18 +21,18 @@ participant: clampAutomationCompositionTopics: topicSources: - - topic: POLICY-ACRUNTIME-PARTICIPANT + topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka fetchTimeout: 15000 useHttps: false topicSinks: - - topic: POLICY-ACRUNTIME-PARTICIPANT + topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka useHttps: false participantSupportedElementTypes: - typeName: org.onap.policy.clamp.acm.K8SMicroserviceAutomationCompositionElement diff --git a/compose/config/clamp/PolicyParticipantParameters.yaml b/compose/config/clamp/PolicyParticipantParameters.yaml index 443a0cae..307e0b60 100644 --- a/compose/config/clamp/PolicyParticipantParameters.yaml +++ b/compose/config/clamp/PolicyParticipantParameters.yaml @@ -34,18 +34,18 @@ participant: clampAutomationCompositionTopics: topicSources: - - topic: POLICY-ACRUNTIME-PARTICIPANT + topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka fetchTimeout: 15000 useHttps: false topicSinks: - - topic: POLICY-ACRUNTIME-PARTICIPANT + topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka useHttps: false participantSupportedElementTypes: - typeName: org.onap.policy.clamp.acm.PolicyAutomationCompositionElement diff --git a/compose/config/clamp/SimulatorParticipantParameters.yaml b/compose/config/clamp/SimulatorParticipantParameters.yaml index 6ec594f2..16234e17 100644 --- a/compose/config/clamp/SimulatorParticipantParameters.yaml +++ b/compose/config/clamp/SimulatorParticipantParameters.yaml @@ -19,17 +19,17 @@ participant: participantId: ${participantId:101c62b3-8918-41b9-a747-d21eb79c6c90} clampAutomationCompositionTopics: topicSources: - - topic: POLICY-ACRUNTIME-PARTICIPANT + - topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka fetchTimeout: 15000 useHttps: false topicSinks: - - topic: POLICY-ACRUNTIME-PARTICIPANT + - topic: policy-acruntime-participant servers: - - ${topicServer:message-router} - topicCommInfrastructure: dmaap + - ${topicServer:kafka:9092} + topicCommInfrastructure: kafka useHttps: false participantSupportedElementTypes: - diff --git a/compose/docker-compose.yml b/compose/docker-compose.yml index f57dac80..3814f215 100644 --- a/compose/docker-compose.yml +++ b/compose/docker-compose.yml @@ -241,7 +241,7 @@ services: container_name: policy-clamp-runtime-acm depends_on: - mariadb - - simulator + - kafka - policy-clamp-ac-http-ppnt - policy-clamp-ac-k8s-ppnt - policy-clamp-ac-pf-ppnt @@ -258,7 +258,7 @@ services: command: [ '-c', './acm-runtime.sh', 'mariadb', '3306', - 'message-router', '3904', + 'kafka', '9092', 'policy-clamp-ac-http-ppnt', '6969', 'policy-clamp-ac-k8s-ppnt', '6969', 'policy-clamp-ac-pf-ppnt', '6969', @@ -269,7 +269,7 @@ services: image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-http-ppnt:${POLICY_CLAMP_VERSION} container_name: policy-clamp-ac-http-ppnt depends_on: - - simulator + - kafka hostname: policy-clamp-ac-http-ppnt ports: - "30290:6969" @@ -280,13 +280,13 @@ services: entrypoint: /opt/app/policy/bin/wait_for_port.sh command: [ '-c', './http-participant.sh', - 'message-router', '3904' + 'kafka', '9092' ] policy-clamp-ac-k8s-ppnt: image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-k8s-ppnt:${POLICY_CLAMP_VERSION} container_name: policy-clamp-ac-k8s-ppnt depends_on: - - simulator + - kafka hostname: policy-clamp-ac-k8s-ppnt ports: - "30295:6969" @@ -297,13 +297,13 @@ services: entrypoint: /opt/app/policy/bin/wait_for_port.sh command: [ '-c', './kubernetes-participant.sh', - 'message-router', '3904' + 'kafka', '9092' ] policy-clamp-ac-pf-ppnt: image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-pf-ppnt:${POLICY_CLAMP_VERSION} container_name: policy-clamp-ac-pf-ppnt depends_on: - - simulator + - kafka - api hostname: policy-clamp-ac-pf-ppnt ports: @@ -315,14 +315,14 @@ services: entrypoint: /opt/app/policy/bin/wait_for_port.sh command: [ '-c', './policy-participant.sh', - 'message-router', '3904', + 'kafka', '9092', 'api', '6969' ] policy-clamp-ac-a1pms-ppnt: image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-a1pms-ppnt:${POLICY_CLAMP_VERSION} container_name: policy-clamp-ac-a1pms-ppnt depends_on: - - simulator + - kafka hostname: policy-clamp-ac-a1pms-ppnt ports: - "30296:6969" @@ -333,13 +333,13 @@ services: entrypoint: /opt/app/policy/bin/wait_for_port.sh command: [ '-c', './a1pms-participant.sh', - 'message-router', '3904' + 'kafka', '9092' ] policy-clamp-ac-kserve-ppnt: image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-kserve-ppnt:${POLICY_CLAMP_VERSION} container_name: policy-clamp-ac-kserve-ppnt depends_on: - - simulator + - kafka hostname: policy-clamp-ac-kserve-ppnt ports: - "30297:6969" @@ -350,13 +350,13 @@ services: entrypoint: /opt/app/policy/bin/wait_for_port.sh command: [ '-c', './kserve-participant.sh', - 'message-router', '3904' + 'kafka', '9092' ] policy-clamp-ac-sim-ppnt: image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-sim-ppnt:${POLICY_CLAMP_VERSION} container_name: policy-clamp-ac-sim-ppnt depends_on: - - simulator + - kafka hostname: policy-clamp-ac-sim-ppnt ports: - ${SIM_PARTICIPANT_PORT}:6969 @@ -367,7 +367,7 @@ services: entrypoint: /opt/app/policy/bin/wait_for_port.sh command: [ '-c', './sim-participant.sh', - 'message-router', '3904' + 'kafka', '9092' ] prometheus: image: nexus3.onap.org:10001/prom/prometheus:latest @@ -389,3 +389,27 @@ services: - ./metrics/dashboard.yaml:/etc/grafana/provisioning/dashboards/dashboard.yaml - ./metrics/datasource.yaml:/etc/grafana/provisioning/datasources/datasource.yaml - ./metrics/dashboards:/var/lib/grafana/dashboards + + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 2181:2181 + + kafka: + image: confluentinc/cp-kafka:latest + container_name: kafka + depends_on: + - zookeeper + ports: + - 29092:29092 + - 9092:9092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/csit/resources/scripts/kafka_consumer.py b/csit/resources/scripts/kafka_consumer.py new file mode 100755 index 00000000..80b6167a --- /dev/null +++ b/csit/resources/scripts/kafka_consumer.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to fetch kafka topic and look for required messages. +# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic. + + +from confluent_kafka import Consumer, KafkaException +import sys +import time + +def consume_kafka_topic(topic, expected_values, timeout): + config = { + 'bootstrap.servers': 'localhost:29092', + 'group.id': 'testgrp', + 'auto.offset.reset': 'earliest' + } + consumer = Consumer(config) + consumer.subscribe([topic]) + try: + start_time = time.time() + while time.time() - start_time < timeout: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n") + print('ERROR') + sys.exit(404) + else: + # Error + raise KafkaException(msg.error()) + else: + # Message received + message = msg.value().decode('utf-8') + if verify_msg(expected_values, message): + print(message) + sys.exit(200) + finally: + consumer.close() + +def verify_msg(expected_values, message): + for item in expected_values: + if item not in message: + return False + return True + + +if __name__ == '__main__': + topic_name = sys.argv[1] + timeout = sys.argv[2] # timeout in seconds for verifying the kafka topic + expected_values = sys.argv[3:] + consume_kafka_topic(topic_name, expected_values, timeout) diff --git a/csit/resources/scripts/kafka_producer.py b/csit/resources/scripts/kafka_producer.py new file mode 100755 index 00000000..ff129872 --- /dev/null +++ b/csit/resources/scripts/kafka_producer.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to produce a message on a kafka topic +# Accepts the arguments {topic_name} and {message} + +from confluent_kafka import Producer +import sys + +def post_to_kafka(topic, message): + conf = {'bootstrap.servers': 'localhost:29092'} + + producer = Producer(conf) + try: + producer.produce(topic, value=message.encode('utf-8')) + producer.flush() + print('Message posted to Kafka topic: {}'.format(topic)) + except Exception as e: + print('Failed to post message: {}'.format(str(e))) + finally: + producer.flush() + +if __name__ == '__main__': + post_to_kafka(sys.argv[1], sys.argv[2]) diff --git a/csit/resources/scripts/make_topics.py b/csit/resources/scripts/make_topics.py new file mode 100755 index 00000000..daee4341 --- /dev/null +++ b/csit/resources/scripts/make_topics.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to create a new kafka topic +# Accepts the argument {topic_name} + +from confluent_kafka.admin import AdminClient, NewTopic +import sys + +def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2): + admin_client = AdminClient({'bootstrap.servers': bootstrap_servers}) + + # Define the topic configuration + topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor) + + # Create the topic + admin_client.create_topics([topic]) + + +if __name__ == '__main__': + topic_name = sys.argv[1] + bootstrap_servers = 'localhost:29092' + + create_topic(bootstrap_servers, topic_name) diff --git a/csit/resources/scripts/prepare-robot-env.sh b/csit/resources/scripts/prepare-robot-env.sh index 2281d235..2b773802 100755 --- a/csit/resources/scripts/prepare-robot-env.sh +++ b/csit/resources/scripts/prepare-robot-env.sh @@ -44,6 +44,10 @@ mkdir -p "${ROBOT_VENV}"/src/onap rm -rf "${ROBOT_VENV}"/src/onap/testsuite python3 -m pip install -qq --upgrade --extra-index-url="https://nexus3.onap.org/repository/PyPi.staging/simple" 'robotframework-onap==0.6.0.*' --pre +# install confluent-kafka +echo "Installing python confluent-kafka library" +python3 -m pip install -qq confluent-kafka + echo "Uninstall docker-py and reinstall docker." python3 -m pip uninstall -y -qq docker python3 -m pip install -U -qq docker diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot index 8c279176..f5db8e0e 100644 --- a/csit/resources/tests/common-library.robot +++ b/csit/resources/tests/common-library.robot @@ -147,3 +147,13 @@ CheckTopic Status Should Be OK ${resp} Should Contain ${resp.text} ${expected_status} [Return] ${resp.text} + +CheckKafkaTopic + [Arguments] ${topic} ${expected_status} + ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 30 ${expected_status} + Log to console Received response from kafka ${resp.stdout} + Should Contain ${resp.text} ${expected_status} + +GetKafkaTopic + [Arguments] ${topic} + ${resp}= Run Process ${CURDIR}/make_topics.py ${topic} \ No newline at end of file