Add Kafka Health Check in common 86/139886/4
authorFrancescoFioraEst <francesco.fiora@est.tech>
Fri, 8 Nov 2024 12:30:58 +0000 (12:30 +0000)
committerFrancesco Fiora <francesco.fiora@est.tech>
Mon, 13 Jan 2025 11:26:14 +0000 (11:26 +0000)
Issue-ID: POLICY-5146
Change-Id: I2dac6ec81e84cc7e95838cf2ea6aad339d0b3a85
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java [new file with mode: 0644]
message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java [new file with mode: 0644]
message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java [new file with mode: 0644]
message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java [new file with mode: 0644]
message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java [new file with mode: 0644]
message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java [new file with mode: 0644]
message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java [new file with mode: 0644]

diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java
new file mode 100644 (file)
index 0000000..f371eda
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck;
+
+import java.util.List;
+
+public interface TopicHealthCheck {
+    boolean healthCheck(List<String> topics);
+}
diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java
new file mode 100644 (file)
index 0000000..d779050
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck;
+
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.healthcheck.kafka.KafkaHealthCheck;
+import org.onap.policy.common.message.bus.healthcheck.noop.NoopHealthCheck;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+
+public class TopicHealthCheckFactory {
+
+    /**
+     * Get Topic HealthCheck.
+     *
+     * @param param TopicParameters
+     * @return TopicHealthCheck
+     */
+    public TopicHealthCheck getTopicHealthCheck(TopicParameters param) {
+        return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
+            case KAFKA -> new KafkaHealthCheck(param);
+            case NOOP ->  new NoopHealthCheck();
+            default -> null;
+        };
+    }
+}
diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java
new file mode 100644 (file)
index 0000000..ef8a0f7
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.KafkaException;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaHealthCheck implements TopicHealthCheck {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);
+    private final TopicParameters parameters;
+
+    public KafkaHealthCheck(TopicParameters parameters) {
+        this.parameters = parameters;
+    }
+
+    /**
+     * Check that Kafka is OnLine and topics are available.
+     *
+     * @return true if Kafka is OnLine
+     */
+    public boolean healthCheck(List<String> topics) {
+        if (parameters.getServers() == null || parameters.getServers().isEmpty()) {
+            logger.warn("Kafka Address not defined!");
+            return true;
+        }
+        try (var client = createAdminClient()) {
+            if (!checkConnection(client)) {
+                logger.warn("Kafka not UP yet!");
+                return false;
+            }
+            if (topics.isEmpty()) {
+                logger.warn("Kafka is UP");
+                return true;
+            }
+
+            return checkTopics(client, topics);
+        } catch (KafkaException | ExecutionException e) {
+            logger.error(e.getMessage());
+            return false;
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+            Thread.currentThread().interrupt();
+            return false;
+        }
+    }
+
+    private boolean checkConnection(AdminClient client) throws ExecutionException, InterruptedException {
+        var nodes = client.describeCluster().nodes().get();
+        if (nodes == null || nodes.isEmpty()) {
+            return false;
+        }
+        nodes.forEach(node -> logger.debug("nodeId {}", node.id()));
+        return true;
+    }
+
+    private boolean checkTopics(AdminClient client, List<String> topics)
+            throws ExecutionException, InterruptedException {
+        var listTopics = client.listTopics().names().get();
+        if (listTopics == null || listTopics.isEmpty()) {
+            logger.warn("Kafka topics not available!");
+            return false;
+        }
+        var setTopics = listTopics.stream().map(String::toLowerCase).collect(Collectors.toSet());
+        for (var topic : topics) {
+            if (!setTopics.contains(topic.toLowerCase())) {
+                logger.warn("Kafka topic {} not available!", topic);
+                return false;
+            }
+        }
+        logger.info("Kafka is UP and topics available!");
+        return true;
+    }
+
+    protected AdminClient createAdminClient() {
+        var kafkaProps = new Properties();
+        kafkaProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, parameters.getServers().get(0));
+
+        if (parameters.isAdditionalPropsValid()) {
+            kafkaProps.putAll(parameters.getAdditionalProps());
+        }
+        return AdminClient.create(kafkaProps);
+    }
+}
diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java
new file mode 100644 (file)
index 0000000..684f7f4
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck.noop;
+
+import java.util.List;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+
+public class NoopHealthCheck implements TopicHealthCheck {
+
+    @Override
+    public boolean healthCheck(List<String> topics) {
+        return true;
+    }
+}
diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java
new file mode 100644 (file)
index 0000000..b71731f
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+
+class TopicHealthCheckFactoryTest {
+
+    @Test
+    void testGetTopicHealthCheck() {
+        var topicHealthCheckFactory = new TopicHealthCheckFactory();
+        var param = new TopicParameters();
+        param.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
+        var topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param);
+        assertNotNull(topicHealthCheck);
+        param.setTopicCommInfrastructure(Topic.CommInfrastructure.KAFKA.name());
+        topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param);
+        assertNotNull(topicHealthCheck);
+        param.setTopicCommInfrastructure(Topic.CommInfrastructure.REST.name());
+        topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param);
+        assertNull(topicHealthCheck);
+    }
+}
diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java
new file mode 100644 (file)
index 0000000..3b65f73
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+
+class KafkaHealthCheckTest {
+
+    @Test
+    void testAdminClient() {
+        var param = new TopicParameters();
+        param.setServers(List.of("localhost"));
+        var healthCheck = new KafkaHealthCheck(param);
+        var result = healthCheck.healthCheck(List.of());
+        assertFalse(result);
+
+        param.setAdditionalProps(Map.of("key", "value"));
+        result = healthCheck.healthCheck(List.of());
+        assertFalse(result);
+    }
+
+    @Test
+    void testMockAdminClientWithError() throws ExecutionException, InterruptedException {
+        var param = new TopicParameters();
+        param.setServers(List.of("localhost"));
+        var adminClient = mock(AdminClient.class);
+        KafkaFuture<Collection<Node>> kafkaFuture = mock(KafkaFuture.class);
+        var describeCluster = mock(DescribeClusterResult.class);
+        when(describeCluster.nodes()).thenReturn(kafkaFuture);
+        when(adminClient.describeCluster()).thenReturn(describeCluster);
+        when(kafkaFuture.get()).thenThrow(new InterruptedException());
+        var healthCheck = createKafkaHealthCheck(adminClient, param);
+        var result = healthCheck.healthCheck(List.of());
+        Assertions.assertFalse(result);
+    }
+
+    @Test
+    void testMockAdminClient() {
+        var param = new TopicParameters();
+        var adminClient = mock(AdminClient.class);
+        // no server address
+        var healthCheck = createKafkaHealthCheck(adminClient, param);
+        var result = healthCheck.healthCheck(List.of());
+        Assertions.assertTrue(result);
+
+        param.setServers(List.of());
+        result = healthCheck.healthCheck(List.of());
+        Assertions.assertTrue(result);
+
+        // no node Kafka
+        param.setServers(List.of("localhost"));
+        healthCheck = createKafkaHealthCheck(adminClient, param);
+        var describeCluster = mock(DescribeClusterResult.class);
+        when(describeCluster.nodes()).thenReturn(KafkaFuture.completedFuture(null));
+        when(adminClient.describeCluster()).thenReturn(describeCluster);
+        result = healthCheck.healthCheck(List.of());
+        Assertions.assertFalse(result);
+
+        // Kafka is UP
+        var node = new Node(1, "localhost", 9092);
+        when(describeCluster.nodes()).thenReturn(KafkaFuture.completedFuture(List.of(node)));
+        result = healthCheck.healthCheck(List.of());
+        Assertions.assertTrue(result);
+
+        // Kafka topics not available
+        var listTopics = mock(ListTopicsResult.class);
+        when(adminClient.listTopics()).thenReturn(listTopics);
+        when(listTopics.names()).thenReturn(KafkaFuture.completedFuture(Set.of()));
+        result = healthCheck.healthCheck(List.of("topic"));
+        Assertions.assertFalse(result);
+
+        when(listTopics.names()).thenReturn(KafkaFuture.completedFuture(Set.of("topic")));
+        result = healthCheck.healthCheck(List.of("wrongTopic"));
+        Assertions.assertFalse(result);
+
+        // Kafka topics available
+        result = healthCheck.healthCheck(List.of("topic"));
+        Assertions.assertTrue(result);
+    }
+
+    private TopicHealthCheck createKafkaHealthCheck(AdminClient adminClient, TopicParameters param) {
+        return new KafkaHealthCheck(param) {
+            @Override
+            protected AdminClient createAdminClient() {
+                return adminClient;
+            }
+        };
+    }
+}
diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java
new file mode 100644 (file)
index 0000000..2eb3604
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck.noop;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.junit.jupiter.api.Test;
+
+class NoopHealthCheckTest {
+
+    @Test
+    void testBuild() {
+        var healthCheck = new NoopHealthCheck();
+        var result = healthCheck.healthCheck(List.of());
+        assertTrue(result);
+    }
+}