Check each topic for Kafka health 33/141833/2
authoradheli.tavares <adheli.tavares@est.tech>
Thu, 14 Aug 2025 12:42:39 +0000 (13:42 +0100)
committeradheli.tavares <adheli.tavares@est.tech>
Thu, 14 Aug 2025 13:08:57 +0000 (14:08 +0100)
- as TopicManager does not shut down if kafka disconnects from
application, there is a need to check each topic individually.

Issue-ID: POLICY-5445
Change-Id: Ib9d194ce561debb8d9675ff0f971ab8b6eac7700
Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/health/KafkaHealthIndicator.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/main/rest/ActuatorControllerTest.java

index 1c862e9..808460e 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2025 Nordix Foundation.
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -88,10 +88,16 @@ public class BrokerStarter<T> {
 
     private void runTopicHealthCheck() {
         var fetchTimeout = getFetchTimeout();
+        var retries = 10; // TODO - make this configurable with max number of retries or timeout
         while (!topicHealthCheck.healthCheck(getTopics())) {
             LOGGER.debug(" Broker not up yet!");
             try {
                 Thread.sleep(fetchTimeout);
+                retries--;
+                if (retries == 0) {
+                    LOGGER.error("Broker not up after {} retries", retries);
+                    break;
+                }
             } catch (InterruptedException e) {
                 LOGGER.error(e.getMessage());
                 Thread.currentThread().interrupt();
@@ -103,7 +109,7 @@ public class BrokerStarter<T> {
         var opTopic = parameters.getIntermediaryParameters().getTopics().getOperationTopic();
         var syncTopic = parameters.getIntermediaryParameters().getTopics().getSyncTopic();
         return Boolean.TRUE.equals(parameters.getIntermediaryParameters().getTopicValidation())
-                ? List.of(opTopic, syncTopic) : List.<String>of();
+                ? List.of(opTopic, syncTopic) : List.of();
     }
 
     private int getFetchTimeout() {
index baeac56..ef5f986 100644 (file)
 
 package org.onap.policy.clamp.acm.runtime.config.health;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
 import org.onap.policy.common.message.bus.event.TopicEndpoint;
 import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheckFactory;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.actuate.health.Health;
 import org.springframework.boot.actuate.health.HealthIndicator;
 import org.springframework.stereotype.Component;
@@ -28,22 +35,75 @@ import org.springframework.stereotype.Component;
 @Component
 public class KafkaHealthIndicator implements HealthIndicator {
 
+    private static final String TOPICS = "topics";
+    private final AcRuntimeParameterGroup parameterGroup;
+
     private final TopicEndpoint topicEndpoint = TopicEndpointManager.getManager();
 
+    @Autowired
+    public KafkaHealthIndicator(AcRuntimeParameterGroup parameterGroup) {
+        this.parameterGroup = parameterGroup;
+    }
+
     @Override
     public Health getHealth(boolean includeDetails) {
-        return HealthIndicator.super.getHealth(includeDetails);
+        return withDetails(includeDetails);
     }
 
     @Override
     public Health health() {
-        var healthBuilder = new Health.Builder();
+        return withDetails(false);
+    }
+
+    private Health withDetails(boolean includeDetails) {
+        final var healthBuilder = new Health.Builder();
+        healthBuilder.up();
         if (topicEndpoint.isAlive()) {
-            healthBuilder.withDetail("topicEndpoint", topicEndpoint);
-            healthBuilder.up();
+            checkTopicsStatus(healthBuilder, includeDetails);
         } else {
             healthBuilder.down();
         }
         return healthBuilder.build();
     }
+
+    /**
+     * Evaluates the health status of Kafka topics (both sinks and sources) and updates the provided
+     * {@link Health.Builder} with the results. If the `includeDetails` flag is set to true, detailed
+     * information about topic sinks, topic sources, and the topic endpoint is added to the health details.
+     *
+     * @param healthBuilder the builder object to which the health status and details will be added
+     * @param includeDetails flag indicating whether detailed topic information should be included
+     */
+    private void checkTopicsStatus(final Health.Builder healthBuilder, boolean includeDetails) {
+        AtomicBoolean outOfService = new AtomicBoolean(false);
+        var topics = new ArrayList<TopicParameters>();
+        topics.addAll(parameterGroup.getTopicParameterGroup().getTopicSinks());
+        topics.addAll(parameterGroup.getTopicParameterGroup().getTopicSources());
+
+        var topicDetails = new HashMap<String, Object>();
+
+        if (!topics.isEmpty()) {
+            var topicParams = topics.get(0);
+            var topicsHealthCheck = new TopicHealthCheckFactory().getTopicHealthCheck(topicParams);
+
+            var topicNames = new ArrayList<String>();
+            topics.forEach(t -> topicNames.add(t.getTopic()));
+
+            if (topicsHealthCheck.healthCheck(topicNames)) {
+                topicDetails.put(TOPICS, Health.up().withDetail(TOPICS, topics).build());
+            } else {
+                topicDetails.put(TOPICS, Health.outOfService().withDetail(TOPICS, topics).build());
+                outOfService.set(true);
+            }
+        }
+
+        if (includeDetails) {
+            healthBuilder.withDetail("topicEndpoint", topicEndpoint);
+            healthBuilder.withDetails(topicDetails);
+        }
+
+        if (outOfService.get()) {
+            healthBuilder.outOfService();
+        }
+    }
 }
index ce3d8dc..a3448c5 100644 (file)
@@ -65,14 +65,28 @@ class ActuatorControllerTest {
 
         TopicEndpoint topicEndpoint = TopicEndpointManager.getManager();
         topicEndpoint.stop();
-
         webClient.get().uri("/health").accept(APPLICATION_JSON)
             .exchange().expectStatus().is5xxServerError()
             .expectBody().jsonPath("$.status.code").isEqualTo("DOWN");
-
         topicEndpoint.start();
     }
 
+    @Test
+    void testHealthIndicator() {
+        TopicEndpoint topicEndpoint = TopicEndpointManager.getManager();
+        topicEndpoint.getNoopTopicSource("policy-acruntime-participant").stop();
+        webClient.get().uri("/health").accept(APPLICATION_JSON)
+            .exchange().expectStatus().is5xxServerError()
+            .expectBody().jsonPath("$.status.code").isEqualTo("OUT_OF_SERVICE");
+        topicEndpoint.getNoopTopicSource("policy-acruntime-participant").start();
+
+        topicEndpoint.getNoopTopicSink("acm-ppnt-sync").stop();
+        webClient.get().uri("/health").accept(APPLICATION_JSON)
+            .exchange().expectStatus().is5xxServerError()
+            .expectBody().jsonPath("$.status.code").isEqualTo("OUT_OF_SERVICE");
+        topicEndpoint.getNoopTopicSink("acm-ppnt-sync").start();
+    }
+
     @Test
     void testGetMetrics() {
         webClient.get().uri("/metrics").accept(APPLICATION_JSON)