Add Kafka Health Check in ACM 99/139899/1
authorFrancescoFioraEst <francesco.fiora@est.tech>
Fri, 8 Nov 2024 13:36:15 +0000 (13:36 +0000)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Mon, 13 Jan 2025 16:43:25 +0000 (16:43 +0000)
Issue-ID: POLICY-5203
Change-Id: Id1705ac74d53cd5f2f8c3d81aae2366baf774fcf
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java [new file with mode: 0644]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java [new file with mode: 0644]
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java

index 3f4d794..d87219b 100644 (file)
@@ -20,6 +20,12 @@ participant:
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90
+    topicValidation: true
+    clampAdminTopics:
+      servers:
+        - ${topicServer:kafka:9092}
+      topicCommInfrastructure: NOOP
+      fetchTimeout: 15000
     clampAutomationCompositionTopics:
       topicSources:
         - topic: ${participant.intermediaryParameters.topics.operationTopic}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java
new file mode 100644 (file)
index 0000000..1c862e9
--- /dev/null
@@ -0,0 +1,140 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheckFactory;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class BrokerStarter<T> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(BrokerStarter.class);
+    private final IntermediaryActivator activator;
+    private final ParticipantHandler participantHandler;
+    private final TopicHealthCheck topicHealthCheck;
+
+    private final ParticipantParameters parameters;
+    private final List<Publisher> publishers;
+    private final List<Listener<T>> listeners;
+
+    /**
+     * Constructor.
+     *
+     * @param parameters ParticipantParameters
+     * @param activator IntermediaryActivator
+     * @param participantHandler participantHandler
+     */
+    public BrokerStarter(ParticipantParameters parameters,
+            List<Publisher> publishers, List<Listener<T>> listeners, IntermediaryActivator activator,
+            ParticipantHandler participantHandler) {
+        this.parameters = parameters;
+        this.listeners = listeners;
+        this.publishers = publishers;
+        this.activator = activator;
+        this.participantHandler = participantHandler;
+        var topic = parameters.getIntermediaryParameters().getClampAdminTopics();
+        if (topic == null) {
+            topic = new TopicParameters();
+            topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
+        }
+        this.topicHealthCheck = createTopicHealthCheck(topic);
+    }
+
+    protected TopicHealthCheck createTopicHealthCheck(TopicParameters topic) {
+        return new TopicHealthCheckFactory().getTopicHealthCheck(topic);
+    }
+
+    /**
+     * Handle ContextRefreshEvent.
+     *
+     * @param ctxRefreshedEvent ContextRefreshedEvent
+     */
+    @EventListener
+    public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
+        if (!activator.isAlive()) {
+            runTopicHealthCheck();
+            start();
+        }
+    }
+
+    private void runTopicHealthCheck() {
+        var fetchTimeout = getFetchTimeout();
+        while (!topicHealthCheck.healthCheck(getTopics())) {
+            LOGGER.debug(" Broker not up yet!");
+            try {
+                Thread.sleep(fetchTimeout);
+            } catch (InterruptedException e) {
+                LOGGER.error(e.getMessage());
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private List<String> getTopics() {
+        var opTopic = parameters.getIntermediaryParameters().getTopics().getOperationTopic();
+        var syncTopic = parameters.getIntermediaryParameters().getTopics().getSyncTopic();
+        return Boolean.TRUE.equals(parameters.getIntermediaryParameters().getTopicValidation())
+                ? List.of(opTopic, syncTopic) : List.<String>of();
+    }
+
+    private int getFetchTimeout() {
+        int fetchTimeout = parameters.getIntermediaryParameters().getClampAdminTopics() == null
+                ? 0 : parameters.getIntermediaryParameters().getClampAdminTopics().getFetchTimeout();
+        return Math.max(fetchTimeout, 5000);
+    }
+
+    private void start() {
+        activator.config(parameters, publishers, listeners);
+        activator.start();
+        var task = new TimerTask() {
+            @Override
+            public void run() {
+                new Thread(participantHandler::sendParticipantRegister).start();
+            }
+        };
+        new Timer().schedule(task, 5000);
+    }
+
+
+    /**
+     * Handle ContextClosedEvent.
+     *
+     * @param ctxClosedEvent ContextClosedEvent
+     */
+    @EventListener
+    public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+        if (activator.isAlive()) {
+            participantHandler.sendParticipantDeregister();
+            activator.stop();
+        }
+    }
+}
index 3886030..cb8df0a 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021,2024 Nordix Foundation.
+ *  Copyright (C) 2021,2024-2025 Nordix Foundation.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,9 +23,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
 import lombok.Getter;
 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
 import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
@@ -34,9 +33,6 @@ import org.onap.policy.common.message.bus.event.TopicEndpointManager;
 import org.onap.policy.common.message.bus.event.TopicSink;
 import org.onap.policy.common.message.bus.event.TopicSource;
 import org.onap.policy.common.utils.services.ServiceManagerContainer;
-import org.springframework.context.event.ContextClosedEvent;
-import org.springframework.context.event.ContextRefreshedEvent;
-import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
 
 /**
@@ -48,10 +44,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
     private static final String[] MSG_TYPE_NAMES = {"messageType"};
 
     // Topics from which the participant receives and to which the participant sends messages
-    private final List<TopicSink> topicSinks;
-    private final List<TopicSource> topicSources;
-
-    private final ParticipantHandler participantHandler;
+    private final List<TopicSink> topicSinks = new ArrayList<>();
+    private final List<TopicSource> topicSources = new ArrayList<>();
 
     @Getter
     private final MessageTypeDispatcher msgDispatcher;
@@ -59,27 +53,30 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
     @Getter
     private final MessageTypeDispatcher syncMsgDispatcher;
 
+    /**
+     * Constructor.
+     */
+    public IntermediaryActivator() {
+        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+        syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+    }
+
     /**
      * Instantiate the activator for participant.
      *
      * @param parameters the ParticipantParameters
-     * @param participantHandler the ParticipantHandler
      * @param publishers list of Publishers
      * @param listeners list of Listeners
      */
-    public <T> IntermediaryActivator(final ParticipantParameters parameters, ParticipantHandler participantHandler,
-        List<Publisher> publishers, List<Listener<T>> listeners) {
-        this.participantHandler = participantHandler;
+    public <T> void config(ParticipantParameters parameters,
+            List<Publisher> publishers, List<Listener<T>> listeners) {
 
-        topicSinks = TopicEndpointManager.getManager().addTopicSinks(
-            parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSinks());
+        topicSinks.addAll(TopicEndpointManager.getManager().addTopicSinks(
+                parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSinks()));
 
-        topicSources = TopicEndpointManager.getManager().addTopicSources(
-            parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSources());
+        topicSources.addAll(TopicEndpointManager.getManager().addTopicSources(
+            parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSources()));
 
-        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
-
-        syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
 
         // @formatter:off
         addAction("Topic endpoint management",
@@ -108,42 +105,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
         // @formatter:on
     }
 
-    /**
-     * Handle ContextRefreshEvent.
-     *
-     * @param ctxRefreshedEvent ContextRefreshedEvent
-     */
-    @EventListener
-    public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
-        if (!isAlive()) {
-            start();
-            var task = new TimerTask() {
-                @Override
-                public void run() {
-                    new Thread(participantHandler::sendParticipantRegister).start();
-                }
-            };
-            new Timer().schedule(task, 5000);
-        }
-    }
-
-    /**
-     * Handle ContextClosedEvent.
-     *
-     * @param ctxClosedEvent ContextClosedEvent
-     */
-    @EventListener
-    public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
-        if (isAlive()) {
-            sendParticipantDeregister();
-            stop();
-        }
-    }
-
-    private void sendParticipantDeregister() {
-        participantHandler.sendParticipantDeregister();
-    }
-
     /**
      * Registers the dispatcher with the topic source(s).
      */
index dad9c8a..d94dc5d 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2024-2025 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
 import org.onap.policy.common.parameters.topic.TopicParameterGroup;
+import org.onap.policy.common.parameters.topic.TopicParameters;
 import org.onap.policy.common.parameters.validation.ParameterGroupConstraint;
 
 /**
@@ -64,4 +65,7 @@ public class ParticipantIntermediaryParameters {
     @Valid
     private Topics topics = new Topics();
 
+    private Boolean topicValidation = false;
+
+    private TopicParameters clampAdminTopics;
 }
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java
new file mode 100644 (file)
index 0000000..093ae9e
--- /dev/null
@@ -0,0 +1,113 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantStatusReqListener;
+import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+
+class BrokerStarterTest {
+
+    @Test
+    void testWithClampAdminTopicsNull() {
+        var parameters = CommonTestData.getParticipantParameters();
+        parameters.getIntermediaryParameters().setClampAdminTopics(null);
+        var publishers = List.of(mock(Publisher.class));
+        var listeners = List.of(mock(ParticipantStatusReqListener.class));
+        var activator = mock(IntermediaryActivator.class);
+        var participantHandler = mock(ParticipantHandler.class);
+        var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler);
+        when(activator.isAlive()).thenReturn(false);
+
+        brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class));
+        verify(activator).start();
+
+        brokerStarter.handleContextClosedEvent(mock(ContextClosedEvent.class));
+        verify(participantHandler, times(0)).sendParticipantDeregister();
+        verify(activator, times(0)).stop();
+    }
+
+    @Test
+    void testAlreadyAlive() {
+        var parameters = CommonTestData.getParticipantParameters();
+        var topic = new TopicParameters();
+        topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
+        parameters.getIntermediaryParameters().setClampAdminTopics(topic);
+        var publishers = List.of(mock(Publisher.class));
+        var listeners = List.of(mock(ParticipantStatusReqListener.class));
+        var activator = mock(IntermediaryActivator.class);
+        var participantHandler = mock(ParticipantHandler.class);
+        var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler);
+
+        when(activator.isAlive()).thenReturn(true);
+        brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class));
+        verify(activator, times(0)).start();
+
+        brokerStarter.handleContextClosedEvent(mock(ContextClosedEvent.class));
+        verify(activator).stop();
+        verify(participantHandler).sendParticipantDeregister();
+    }
+
+    private static class DummyTopicHealthCheck implements TopicHealthCheck {
+
+        int count = 0;
+
+        // first call is false, next will be true
+        @Override
+        public boolean healthCheck(List<String> list) {
+            return (count++) > 0;
+        }
+    }
+
+    @Test
+    void testWithClampAdminTopics() {
+        var parameters = CommonTestData.getParticipantParameters();
+        var topic = new TopicParameters();
+        topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
+        parameters.getIntermediaryParameters().setClampAdminTopics(topic);
+        var publishers = List.of(mock(Publisher.class));
+        var listeners = List.of(mock(ParticipantStatusReqListener.class));
+        var activator = mock(IntermediaryActivator.class);
+        var participantHandler = mock(ParticipantHandler.class);
+        var topicHealthCheck = new DummyTopicHealthCheck();
+        var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler) {
+            @Override
+            protected TopicHealthCheck createTopicHealthCheck(TopicParameters topic) {
+                return topicHealthCheck;
+            }
+        };
+
+        when(activator.isAlive()).thenReturn(false);
+        brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class));
+        verify(activator).start();
+    }
+}
index 89bafa1..016f46c 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2024-2025 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
 package org.onap.policy.clamp.acm.participant.intermediary.handler;
 
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -39,8 +38,6 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSt
 import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.springframework.context.event.ContextClosedEvent;
-import org.springframework.context.event.ContextRefreshedEvent;
 
 class IntermediaryActivatorTest {
     private static final Coder CODER = new StandardCoder();
@@ -68,8 +65,8 @@ class IntermediaryActivatorTest {
 
         List<Listener<ParticipantStatusReq>> listeners = List.of(listenerFirst, listenerSecond);
 
-        var handler = mock(ParticipantHandler.class);
-        try (var activator = new IntermediaryActivator(parameters, handler, publishers, listeners)) {
+        try (var activator = new IntermediaryActivator()) {
+            activator.config(parameters, publishers, listeners);
 
             assertFalse(activator.isAlive());
             activator.start();
@@ -95,9 +92,6 @@ class IntermediaryActivatorTest {
             // repeat stop - should throw an exception
             assertThatIllegalStateException().isThrownBy(activator::stop);
             assertFalse(activator.isAlive());
-
-            assertDoesNotThrow(() -> activator.handleContextRefreshEvent(mock(ContextRefreshedEvent.class)));
-            assertDoesNotThrow(() -> activator.handleContextClosedEvent(mock(ContextClosedEvent.class)));
         }
     }
 }