/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2025 Nordix Foundation.
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
private void runTopicHealthCheck() {
var fetchTimeout = getFetchTimeout();
+ var retries = 10; // TODO - make this configurable with max number of retries or timeout
while (!topicHealthCheck.healthCheck(getTopics())) {
LOGGER.debug(" Broker not up yet!");
try {
Thread.sleep(fetchTimeout);
+ retries--;
+ if (retries == 0) {
+ LOGGER.error("Broker not up after {} retries", retries);
+ break;
+ }
} catch (InterruptedException e) {
LOGGER.error(e.getMessage());
Thread.currentThread().interrupt();
var opTopic = parameters.getIntermediaryParameters().getTopics().getOperationTopic();
var syncTopic = parameters.getIntermediaryParameters().getTopics().getSyncTopic();
return Boolean.TRUE.equals(parameters.getIntermediaryParameters().getTopicValidation())
- ? List.of(opTopic, syncTopic) : List.<String>of();
+ ? List.of(opTopic, syncTopic) : List.of();
}
private int getFetchTimeout() {
package org.onap.policy.clamp.acm.runtime.config.health;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.common.message.bus.event.TopicEndpoint;
import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheckFactory;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;
@Component
public class KafkaHealthIndicator implements HealthIndicator {
+ private static final String TOPICS = "topics";
+ private final AcRuntimeParameterGroup parameterGroup;
+
private final TopicEndpoint topicEndpoint = TopicEndpointManager.getManager();
+ @Autowired
+ public KafkaHealthIndicator(AcRuntimeParameterGroup parameterGroup) {
+ this.parameterGroup = parameterGroup;
+ }
+
@Override
public Health getHealth(boolean includeDetails) {
- return HealthIndicator.super.getHealth(includeDetails);
+ return withDetails(includeDetails);
}
@Override
public Health health() {
- var healthBuilder = new Health.Builder();
+ return withDetails(false);
+ }
+
+ private Health withDetails(boolean includeDetails) {
+ final var healthBuilder = new Health.Builder();
+ healthBuilder.up();
if (topicEndpoint.isAlive()) {
- healthBuilder.withDetail("topicEndpoint", topicEndpoint);
- healthBuilder.up();
+ checkTopicsStatus(healthBuilder, includeDetails);
} else {
healthBuilder.down();
}
return healthBuilder.build();
}
+
+ /**
+ * Evaluates the health status of Kafka topics (both sinks and sources) and updates the provided
+ * {@link Health.Builder} with the results. If the `includeDetails` flag is set to true, detailed
+ * information about topic sinks, topic sources, and the topic endpoint is added to the health details.
+ *
+ * @param healthBuilder the builder object to which the health status and details will be added
+ * @param includeDetails flag indicating whether detailed topic information should be included
+ */
+ private void checkTopicsStatus(final Health.Builder healthBuilder, boolean includeDetails) {
+ AtomicBoolean outOfService = new AtomicBoolean(false);
+ var topics = new ArrayList<TopicParameters>();
+ topics.addAll(parameterGroup.getTopicParameterGroup().getTopicSinks());
+ topics.addAll(parameterGroup.getTopicParameterGroup().getTopicSources());
+
+ var topicDetails = new HashMap<String, Object>();
+
+ if (!topics.isEmpty()) {
+ var topicParams = topics.get(0);
+ var topicsHealthCheck = new TopicHealthCheckFactory().getTopicHealthCheck(topicParams);
+
+ var topicNames = new ArrayList<String>();
+ topics.forEach(t -> topicNames.add(t.getTopic()));
+
+ if (topicsHealthCheck.healthCheck(topicNames)) {
+ topicDetails.put(TOPICS, Health.up().withDetail(TOPICS, topics).build());
+ } else {
+ topicDetails.put(TOPICS, Health.outOfService().withDetail(TOPICS, topics).build());
+ outOfService.set(true);
+ }
+ }
+
+ if (includeDetails) {
+ healthBuilder.withDetail("topicEndpoint", topicEndpoint);
+ healthBuilder.withDetails(topicDetails);
+ }
+
+ if (outOfService.get()) {
+ healthBuilder.outOfService();
+ }
+ }
}
TopicEndpoint topicEndpoint = TopicEndpointManager.getManager();
topicEndpoint.stop();
-
webClient.get().uri("/health").accept(APPLICATION_JSON)
.exchange().expectStatus().is5xxServerError()
.expectBody().jsonPath("$.status.code").isEqualTo("DOWN");
-
topicEndpoint.start();
}
+ @Test
+ void testHealthIndicator() {
+ TopicEndpoint topicEndpoint = TopicEndpointManager.getManager();
+ topicEndpoint.getNoopTopicSource("policy-acruntime-participant").stop();
+ webClient.get().uri("/health").accept(APPLICATION_JSON)
+ .exchange().expectStatus().is5xxServerError()
+ .expectBody().jsonPath("$.status.code").isEqualTo("OUT_OF_SERVICE");
+ topicEndpoint.getNoopTopicSource("policy-acruntime-participant").start();
+
+ topicEndpoint.getNoopTopicSink("acm-ppnt-sync").stop();
+ webClient.get().uri("/health").accept(APPLICATION_JSON)
+ .exchange().expectStatus().is5xxServerError()
+ .expectBody().jsonPath("$.status.code").isEqualTo("OUT_OF_SERVICE");
+ topicEndpoint.getNoopTopicSink("acm-ppnt-sync").start();
+ }
+
@Test
void testGetMetrics() {
webClient.get().uri("/metrics").accept(APPLICATION_JSON)