From bf4217b4386b3c91a9e082d424012aecbf8f2eab Mon Sep 17 00:00:00 2001 From: "adheli.tavares" Date: Thu, 14 Aug 2025 13:42:39 +0100 Subject: [PATCH] Check each topic for Kafka health - as TopicManager does not shut down if kafka disconnects from application, there is a need to check each topic individually. Issue-ID: POLICY-5445 Change-Id: Ib9d194ce561debb8d9675ff0f971ab8b6eac7700 Signed-off-by: adheli.tavares --- .../intermediary/handler/BrokerStarter.java | 10 +++- .../config/health/KafkaHealthIndicator.java | 68 ++++++++++++++++++++-- .../runtime/main/rest/ActuatorControllerTest.java | 18 +++++- 3 files changed, 88 insertions(+), 8 deletions(-) diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java index 1c862e925..808460ee1 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2025 Nordix Foundation. + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,10 +88,16 @@ public class BrokerStarter { private void runTopicHealthCheck() { var fetchTimeout = getFetchTimeout(); + var retries = 10; // TODO - make this configurable with max number of retries or timeout while (!topicHealthCheck.healthCheck(getTopics())) { LOGGER.debug(" Broker not up yet!"); try { Thread.sleep(fetchTimeout); + retries--; + if (retries == 0) { + LOGGER.error("Broker not up after {} retries", retries); + break; + } } catch (InterruptedException e) { LOGGER.error(e.getMessage()); Thread.currentThread().interrupt(); @@ -103,7 +109,7 @@ public class BrokerStarter { var opTopic = parameters.getIntermediaryParameters().getTopics().getOperationTopic(); var syncTopic = parameters.getIntermediaryParameters().getTopics().getSyncTopic(); return Boolean.TRUE.equals(parameters.getIntermediaryParameters().getTopicValidation()) - ? List.of(opTopic, syncTopic) : List.of(); + ? List.of(opTopic, syncTopic) : List.of(); } private int getFetchTimeout() { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/health/KafkaHealthIndicator.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/health/KafkaHealthIndicator.java index baeac5671..ef5f986a9 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/health/KafkaHealthIndicator.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/health/KafkaHealthIndicator.java @@ -19,8 +19,15 @@ package org.onap.policy.clamp.acm.runtime.config.health; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.common.message.bus.event.TopicEndpoint; import org.onap.policy.common.message.bus.event.TopicEndpointManager; +import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheckFactory; +import org.onap.policy.common.parameters.topic.TopicParameters; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.HealthIndicator; import org.springframework.stereotype.Component; @@ -28,22 +35,75 @@ import org.springframework.stereotype.Component; @Component public class KafkaHealthIndicator implements HealthIndicator { + private static final String TOPICS = "topics"; + private final AcRuntimeParameterGroup parameterGroup; + private final TopicEndpoint topicEndpoint = TopicEndpointManager.getManager(); + @Autowired + public KafkaHealthIndicator(AcRuntimeParameterGroup parameterGroup) { + this.parameterGroup = parameterGroup; + } + @Override public Health getHealth(boolean includeDetails) { - return HealthIndicator.super.getHealth(includeDetails); + return withDetails(includeDetails); } @Override public Health health() { - var healthBuilder = new Health.Builder(); + return withDetails(false); + } + + private Health withDetails(boolean includeDetails) { + final var healthBuilder = new Health.Builder(); + healthBuilder.up(); if (topicEndpoint.isAlive()) { - healthBuilder.withDetail("topicEndpoint", topicEndpoint); - healthBuilder.up(); + checkTopicsStatus(healthBuilder, includeDetails); } else { healthBuilder.down(); } return healthBuilder.build(); } + + /** + * Evaluates the health status of Kafka topics (both sinks and sources) and updates the provided + * {@link Health.Builder} with the results. If the `includeDetails` flag is set to true, detailed + * information about topic sinks, topic sources, and the topic endpoint is added to the health details. + * + * @param healthBuilder the builder object to which the health status and details will be added + * @param includeDetails flag indicating whether detailed topic information should be included + */ + private void checkTopicsStatus(final Health.Builder healthBuilder, boolean includeDetails) { + AtomicBoolean outOfService = new AtomicBoolean(false); + var topics = new ArrayList(); + topics.addAll(parameterGroup.getTopicParameterGroup().getTopicSinks()); + topics.addAll(parameterGroup.getTopicParameterGroup().getTopicSources()); + + var topicDetails = new HashMap(); + + if (!topics.isEmpty()) { + var topicParams = topics.get(0); + var topicsHealthCheck = new TopicHealthCheckFactory().getTopicHealthCheck(topicParams); + + var topicNames = new ArrayList(); + topics.forEach(t -> topicNames.add(t.getTopic())); + + if (topicsHealthCheck.healthCheck(topicNames)) { + topicDetails.put(TOPICS, Health.up().withDetail(TOPICS, topics).build()); + } else { + topicDetails.put(TOPICS, Health.outOfService().withDetail(TOPICS, topics).build()); + outOfService.set(true); + } + } + + if (includeDetails) { + healthBuilder.withDetail("topicEndpoint", topicEndpoint); + healthBuilder.withDetails(topicDetails); + } + + if (outOfService.get()) { + healthBuilder.outOfService(); + } + } } diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/main/rest/ActuatorControllerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/main/rest/ActuatorControllerTest.java index ce3d8dc80..a3448c5e3 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/main/rest/ActuatorControllerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/main/rest/ActuatorControllerTest.java @@ -65,14 +65,28 @@ class ActuatorControllerTest { TopicEndpoint topicEndpoint = TopicEndpointManager.getManager(); topicEndpoint.stop(); - webClient.get().uri("/health").accept(APPLICATION_JSON) .exchange().expectStatus().is5xxServerError() .expectBody().jsonPath("$.status.code").isEqualTo("DOWN"); - topicEndpoint.start(); } + @Test + void testHealthIndicator() { + TopicEndpoint topicEndpoint = TopicEndpointManager.getManager(); + topicEndpoint.getNoopTopicSource("policy-acruntime-participant").stop(); + webClient.get().uri("/health").accept(APPLICATION_JSON) + .exchange().expectStatus().is5xxServerError() + .expectBody().jsonPath("$.status.code").isEqualTo("OUT_OF_SERVICE"); + topicEndpoint.getNoopTopicSource("policy-acruntime-participant").start(); + + topicEndpoint.getNoopTopicSink("acm-ppnt-sync").stop(); + webClient.get().uri("/health").accept(APPLICATION_JSON) + .exchange().expectStatus().is5xxServerError() + .expectBody().jsonPath("$.status.code").isEqualTo("OUT_OF_SERVICE"); + topicEndpoint.getNoopTopicSink("acm-ppnt-sync").start(); + } + @Test void testGetMetrics() { webClient.get().uri("/metrics").accept(APPLICATION_JSON) -- 2.16.6