Fix participant registration issue 09/138109/1
authorFrancescoFioraEst <francesco.fiora@est.tech>
Thu, 30 May 2024 08:24:44 +0000 (09:24 +0100)
committerFrancesco Fiora <francesco.fiora@est.tech>
Fri, 31 May 2024 08:11:13 +0000 (08:11 +0000)
Issue-ID: POLICY-5039
Change-Id: Ic83a1feba3749f7a496749cbce174d7342d0cdcf
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java

index 926de85..8fa2ec3 100644 (file)
@@ -126,18 +126,6 @@ public class ParticipantMessagePublisher implements Publisher {
         LOGGER.debug("Sent AutomationComposition Update/StateChange Ack to runtime - {}", automationCompositionAck);
     }
 
-    /**
-     * Method to send Participant heartbeat to clamp on demand.
-     *
-     * @param participantStatus the Participant Status
-     */
-    @Timed(value = "publisher.participant_status", description = "PARTICIPANT_STATUS messages published")
-    public void sendHeartbeat(final ParticipantStatus participantStatus) {
-        validate();
-        topicSinkClient.send(participantStatus);
-        LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus);
-    }
-
     private void validate() {
         if (!active) {
             throw new AutomationCompositionRuntimeException(Status.NOT_ACCEPTABLE, NOT_ACTIVE_TEXT);
index 9f9b25a..0ed333e 100644 (file)
@@ -267,7 +267,7 @@ public class AutomationCompositionOutHandler {
         participantDefinition.setParticipantId(cacheProvider.getParticipantId());
         participantDefinition.setAutomationCompositionElementDefinitionList(List.of(acElementDefinition));
         statusMsg.setParticipantDefinitionUpdates(List.of(participantDefinition));
-        publisher.sendHeartbeat(statusMsg);
+        publisher.sendParticipantStatus(statusMsg);
     }
 
     private AutomationCompositionElementDefinition getAutomationCompositionElementDefinition(
index f51e8ba..343f8a9 100644 (file)
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import lombok.Getter;
 import lombok.NonNull;
+import lombok.Setter;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -48,6 +49,10 @@ public class CacheProvider {
     @Getter
     private final UUID participantId;
 
+    @Getter
+    @Setter
+    private boolean registered = false;
+
     private final List<ParticipantSupportedElementType> supportedAcElementTypes;
 
     @Getter
index 6586ae4..a77d524 100644 (file)
@@ -24,6 +24,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 import lombok.Getter;
 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
@@ -78,16 +80,16 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
             () -> TopicEndpointManager.getManager().start(),
             () -> TopicEndpointManager.getManager().shutdown());
 
+        listeners.forEach(listener ->
+                addAction("Listener " + listener.getClass().getSimpleName(),
+                        () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
+                        () -> msgDispatcher.unregister(listener.getType())));
+
         publishers.forEach(publisher ->
             addAction("Publisher " + publisher.getClass().getSimpleName(),
                 () -> publisher.active(topicSinks),
                 publisher::stop));
 
-        listeners.forEach(listener ->
-            addAction("Listener " + listener.getClass().getSimpleName(),
-                () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
-                () -> msgDispatcher.unregister(listener.getType())));
-
         addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
         // @formatter:on
     }
@@ -101,7 +103,13 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
     public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
         if (!isAlive()) {
             start();
-            sendParticipantRegister();
+            var task = new TimerTask() {
+                @Override
+                public void run() {
+                    new Thread(participantHandler::sendParticipantRegister).start();
+                }
+            };
+            new Timer().schedule(task, 5000);
         }
     }
 
@@ -118,10 +126,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
         }
     }
 
-    private void sendParticipantRegister() {
-        participantHandler.sendParticipantRegister();
-    }
-
     private void sendParticipantDeregister() {
         participantHandler.sendParticipantDeregister();
     }
index a4a92f8..0865dca 100644 (file)
@@ -66,7 +66,7 @@ public class ParticipantHandler {
      */
     @Timed(value = "listener.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages received")
     public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) {
-        publisher.sendParticipantStatus(makeHeartbeat());
+        sendHeartbeat();
     }
 
     /**
@@ -159,6 +159,7 @@ public class ParticipantHandler {
     public void handleParticipantRegisterAck(ParticipantRegisterAck participantRegisterAckMsg) {
         LOGGER.debug("ParticipantRegisterAck message received as responseTo {}",
                 participantRegisterAckMsg.getResponseTo());
+        cacheProvider.setRegistered(true);
         publisher.sendParticipantStatus(makeHeartbeat());
     }
 
@@ -210,7 +211,11 @@ public class ParticipantHandler {
      */
     public void sendHeartbeat() {
         if (publisher.isActive()) {
-            publisher.sendHeartbeat(makeHeartbeat());
+            if (!cacheProvider.isRegistered()) {
+                sendParticipantRegister();
+            } else {
+                publisher.sendParticipantStatus(makeHeartbeat());
+            }
         }
     }
 
index 754fc65..10f9d45 100644 (file)
@@ -93,8 +93,6 @@ class ParticipantCommTest {
         var participantStatus = new ParticipantStatus();
         assertDoesNotThrow(() -> publisher.sendParticipantStatus(participantStatus));
 
-        assertDoesNotThrow(() -> publisher.sendHeartbeat(participantStatus));
-
         var participantRegister = new ParticipantRegister();
         assertDoesNotThrow(() -> publisher.sendParticipantRegister(participantRegister));
 
@@ -115,7 +113,6 @@ class ParticipantCommTest {
         var participantStatus = new ParticipantStatus();
         assertThrows(AutomationCompositionRuntimeException.class,
                 () -> publisher.sendParticipantStatus(participantStatus));
-        assertThrows(AutomationCompositionRuntimeException.class, () -> publisher.sendHeartbeat(participantStatus));
 
         var participantRegister = new ParticipantRegister();
         assertThrows(AutomationCompositionRuntimeException.class,
index 76fbd06..eed5319 100644 (file)
@@ -206,16 +206,16 @@ class AutomationCompositionOutHandlerTest {
         var acOutHandler = new AutomationCompositionOutHandler(publisher, cacheProvider);
 
         acOutHandler.sendAcDefinitionInfo(null, null, Map.of());
-        verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+        verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
 
         acOutHandler.sendAcDefinitionInfo(UUID.randomUUID(), null, Map.of());
-        verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+        verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
 
         acOutHandler.sendAcDefinitionInfo(compositionId, new ToscaConceptIdentifier("wrong", "1.0.0"), Map.of());
-        verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+        verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
 
         acOutHandler.sendAcDefinitionInfo(compositionId, elementId, Map.of());
-        verify(publisher).sendHeartbeat(any(ParticipantStatus.class));
+        verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
     }
 
     @Test
index 8aac931..cd28d41 100644 (file)
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -58,10 +59,16 @@ class ParticipantHandlerTest {
     @Test
     void handleParticipantStatusReqTest() {
         var publisher = mock(ParticipantMessagePublisher.class);
+        when(publisher.isActive()).thenReturn(true);
         var cacheProvider = mock(CacheProvider.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
                 mock(AcLockHandler.class), mock(AcDefinitionHandler.class), publisher, cacheProvider);
         participantHandler.handleParticipantStatusReq(new ParticipantStatusReq());
+        verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
+
+        when(cacheProvider.isRegistered()).thenReturn(true);
+        clearInvocations(publisher);
+        participantHandler.handleParticipantStatusReq(new ParticipantStatusReq());
         verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
     }
 
@@ -213,6 +220,7 @@ class ParticipantHandlerTest {
     void sendHeartbeatTest() {
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+        when(cacheProvider.isRegistered()).thenReturn(false);
         when(cacheProvider.getAutomationCompositions()).thenReturn(CommonTestData.getTestAutomationCompositionMap());
         var publisher = mock(ParticipantMessagePublisher.class);
         when(publisher.isActive()).thenReturn(true);
@@ -220,7 +228,11 @@ class ParticipantHandlerTest {
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
                 mock(AcLockHandler.class), acHandler, publisher, cacheProvider);
         participantHandler.sendHeartbeat();
-        verify(publisher).sendHeartbeat(any(ParticipantStatus.class));
-    }
+        verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
 
+        when(cacheProvider.isRegistered()).thenReturn(true);
+        clearInvocations(publisher);
+        participantHandler.sendHeartbeat();
+        verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
+    }
 }