Intermittent issue in event handling between Cl runtime and participants 85/123485/2
authorFrancescoFioraEst <francesco.fiora@est.tech>
Fri, 20 Aug 2021 14:05:26 +0000 (15:05 +0100)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 24 Aug 2021 13:42:12 +0000 (14:42 +0100)
Issue-ID: POLICY-3544
Change-Id: I40c5dc537b17986d01ab0d213e7ea7c9cdb7d59e
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
runtime-controlloop/src/main/resources/application.yaml
runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java

index 3970d15..0aa3fb7 100644 (file)
@@ -24,12 +24,10 @@ runtime:
   participantClStateChangeIntervalSec: 1000
   participantParameters:
     heartBeatMs: 120000
+    maxMessageAgeMs: 600000
     updateParameters:
-      maxRetryCount: 1
-      maxWaitMs: 30000
-    stateChangeParameters:
-      maxRetryCount: 1
-      maxWaitMs: 30000
+      maxRetryCount: 3
+      maxWaitMs: 100000
   databaseProviderParameters:
     name: PolicyProviderParameterGroup
     implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl
index daf9ebe..2bc21f7 100644 (file)
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.commons.collections4.CollectionUtils;
@@ -250,13 +251,13 @@ public class ControlLoopHandler {
     }
 
     private List<ControlLoopElement> storeElementsOnThisParticipant(List<ParticipantUpdates> participantUpdates) {
-        List<ControlLoopElement> clElementMap = new ArrayList<>();
-        for (ParticipantUpdates participantUpdate : participantUpdates) {
-            if (participantUpdate.getParticipantId().equals(participantType)) {
-                clElementMap = participantUpdate.getControlLoopElementList();
-            }
-        }
-        for (ControlLoopElement element : clElementMap) {
+        var clElementMap =
+                participantUpdates.stream()
+                .flatMap(participantUpdate -> participantUpdate.getControlLoopElementList().stream())
+                .filter(element -> participantType.equals(element.getParticipantType()))
+                .collect(Collectors.toList());
+
+        for (var element : clElementMap) {
             elementsOnThisParticipant.put(element.getId(), element);
         }
         return clElementMap;
index 2151dc1..7e070d7 100644 (file)
@@ -57,6 +57,17 @@ public class HandleCounter<K> {
         mapTimer.put(id, getEpochMilli());
     }
 
+    /**
+     * Remove counter, timer and fault by id.
+     *
+     * @param id the id
+     */
+    public void remove(K id) {
+        mapFault.remove(id);
+        mapCounter.remove(id);
+        mapTimer.remove(id);
+    }
+
     public void setFault(K id) {
         mapCounter.put(id, 0);
         mapFault.add(id);
@@ -88,4 +99,8 @@ public class HandleCounter<K> {
     protected long getEpochMilli() {
         return Instant.now().toEpochMilli();
     }
+
+    public Set<K> keySet() {
+        return mapCounter.keySet();
+    }
 }
index d0d18ab..fbb2742 100644 (file)
@@ -26,10 +26,13 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.aspectj.lang.annotation.After;
 import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.annotation.Before;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -71,6 +74,18 @@ public class SupervisionAspect implements Closeable {
         executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId()));
     }
 
+    @Before("@annotation(MessageIntercept) && args(participantRegisterMessage,..)")
+    public void handleParticipantRegister(ParticipantRegister participantRegisterMessage) {
+        executor.execute(() -> supervisionScanner.handleParticipantRegister(new ImmutablePair<>(
+                participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType())));
+    }
+
+    @Before("@annotation(MessageIntercept) && args(participantUpdateAckMessage,..)")
+    public void handleParticipantUpdateAck(ParticipantUpdateAck participantUpdateAckMessage) {
+        executor.execute(() -> supervisionScanner.handleParticipantUpdateAck(new ImmutablePair<>(
+                participantUpdateAckMessage.getParticipantId(), participantUpdateAckMessage.getParticipantType())));
+    }
+
     @Override
     public void close() throws IOException {
         executor.shutdown();
index 0e2ff5c..db7d348 100644 (file)
@@ -21,7 +21,6 @@
 package org.onap.policy.clamp.controlloop.runtime.supervision;
 
 import java.util.List;
-import java.util.Map;
 import javax.ws.rs.core.Response;
 import lombok.AllArgsConstructor;
 import org.apache.commons.collections4.CollectionUtils;
@@ -225,7 +224,7 @@ public class SupervisionHandler {
     }
 
     private void superviseControlLoopPassivation(ControlLoop controlLoop)
-            throws ControlLoopException, PfModelException {
+            throws ControlLoopException {
         switch (controlLoop.getState()) {
             case PASSIVE:
                 exceptionOccured(Response.Status.NOT_ACCEPTABLE,
index 7be407c..151b04c 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.policy.clamp.controlloop.runtime.supervision;
 
 import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
@@ -32,6 +33,7 @@ import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParame
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
 import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.slf4j.Logger;
@@ -46,15 +48,18 @@ public class SupervisionScanner {
     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
 
     private HandleCounter<ToscaConceptIdentifier> controlLoopCounter = new HandleCounter<>();
-    private HandleCounter<ToscaConceptIdentifier> participantCounter = new HandleCounter<>();
+    private HandleCounter<ToscaConceptIdentifier> participantStatusCounter = new HandleCounter<>();
+    private HandleCounter<Pair<ToscaConceptIdentifier, ToscaConceptIdentifier>> participantUpdateCounter =
+            new HandleCounter<>();
 
     private final ControlLoopProvider controlLoopProvider;
     private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
     private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
     private final ParticipantProvider participantProvider;
     private final ParticipantStatusReqPublisher participantStatusReqPublisher;
+    private final ParticipantUpdatePublisher participantUpdatePublisher;
 
-    private final long maxMessageAgeMs;
+    private final long maxWaitMs;
 
     /**
      * Constructor for instantiating SupervisionScanner.
@@ -64,30 +69,38 @@ public class SupervisionScanner {
      * @param controlLoopUpdatePublisher the ControlLoopUpdate Publisher
      * @param participantProvider the Participant Provider
      * @param participantStatusReqPublisher the Participant StatusReq Publisher
+     * @param participantUpdatePublisher the Participant Update Publisher
      * @param clRuntimeParameterGroup the parameters for the control loop runtime
      */
     public SupervisionScanner(final ControlLoopProvider controlLoopProvider,
             final ControlLoopStateChangePublisher controlLoopStateChangePublisher,
             ControlLoopUpdatePublisher controlLoopUpdatePublisher, ParticipantProvider participantProvider,
             ParticipantStatusReqPublisher participantStatusReqPublisher,
+            ParticipantUpdatePublisher participantUpdatePublisher,
             final ClRuntimeParameterGroup clRuntimeParameterGroup) {
         this.controlLoopProvider = controlLoopProvider;
         this.controlLoopStateChangePublisher = controlLoopStateChangePublisher;
         this.controlLoopUpdatePublisher = controlLoopUpdatePublisher;
         this.participantProvider = participantProvider;
         this.participantStatusReqPublisher = participantStatusReqPublisher;
+        this.participantUpdatePublisher = participantUpdatePublisher;
 
         controlLoopCounter.setMaxRetryCount(
                 clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
         controlLoopCounter
                 .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
 
-        participantCounter.setMaxRetryCount(
+        participantUpdateCounter.setMaxRetryCount(
                 clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
-        participantCounter
+        participantUpdateCounter
                 .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
 
-        maxMessageAgeMs = clRuntimeParameterGroup.getParticipantParameters().getMaxMessageAgeMs();
+        participantStatusCounter.setMaxRetryCount(
+                clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
+        participantStatusCounter
+                .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
+
+        maxWaitMs = clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs();
     }
 
     /**
@@ -101,7 +114,7 @@ public class SupervisionScanner {
         if (counterCheck) {
             try {
                 for (Participant participant : participantProvider.getParticipants(null, null)) {
-                    scanParticipant(participant);
+                    scanParticipantStatus(participant);
                 }
             } catch (PfModelException pfme) {
                 LOGGER.warn("error reading participant from database", pfme);
@@ -116,24 +129,49 @@ public class SupervisionScanner {
         } catch (PfModelException pfme) {
             LOGGER.warn("error reading control loops from database", pfme);
         }
+        if (counterCheck) {
+            scanParticipantUpdate();
+        }
 
         LOGGER.debug("Control loop scan complete . . .");
     }
 
-    private void scanParticipant(Participant participant) throws PfModelException {
+    private void scanParticipantUpdate() {
+        LOGGER.debug("Scanning participants to update . . .");
+
+        for (var id : participantUpdateCounter.keySet()) {
+            if (participantUpdateCounter.isFault(id)) {
+                LOGGER.debug("report Participant Update fault");
+
+            } else if (participantUpdateCounter.getDuration(id) > maxWaitMs) {
+
+                if (participantUpdateCounter.count(id)) {
+                    LOGGER.debug("retry message ParticipantUpdate");
+                    participantUpdatePublisher.send(id.getLeft(), id.getRight());
+                } else {
+                    LOGGER.debug("report Participant Update fault");
+                    participantUpdateCounter.setFault(id);
+                }
+            }
+        }
+
+        LOGGER.debug("Participants to update scan complete . . .");
+    }
+
+    private void scanParticipantStatus(Participant participant) throws PfModelException {
         ToscaConceptIdentifier id = participant.getKey().asIdentifier();
-        if (participantCounter.isFault(id)) {
+        if (participantStatusCounter.isFault(id)) {
             LOGGER.debug("report Participant fault");
             return;
         }
-        if (participantCounter.getDuration(id) > maxMessageAgeMs) {
-            if (participantCounter.count(id)) {
+        if (participantStatusCounter.getDuration(id) > maxWaitMs) {
+            if (participantStatusCounter.count(id)) {
                 LOGGER.debug("retry message ParticipantStatusReq");
                 participantStatusReqPublisher.send(id);
                 participant.setHealthStatus(ParticipantHealthStatus.NOT_HEALTHY);
             } else {
                 LOGGER.debug("report Participant fault");
-                participantCounter.setFault(id);
+                participantStatusCounter.setFault(id);
                 participant.setHealthStatus(ParticipantHealthStatus.OFF_LINE);
             }
             participantProvider.updateParticipants(List.of(participant));
@@ -144,7 +182,15 @@ public class SupervisionScanner {
      * handle participant Status message.
      */
     public void handleParticipantStatus(ToscaConceptIdentifier id) {
-        participantCounter.clear(id);
+        participantStatusCounter.clear(id);
+    }
+
+    public void handleParticipantRegister(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
+        participantUpdateCounter.clear(id);
+    }
+
+    public void handleParticipantUpdateAck(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
+        participantUpdateCounter.remove(id);
     }
 
     private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException {
index d0e5500..ea98aaa 100644 (file)
@@ -24,9 +24,10 @@ runtime:
   participantClStateChangeIntervalSec: 1000
   participantParameters:
     heartBeatMs: 120000
+    maxMessageAgeMs: 600000
     updateParameters:
-      maxRetryCount: 1
-      maxWaitMs: 30000
+      maxRetryCount: 3
+      maxWaitMs: 100000
   databaseProviderParameters:
     name: PolicyProviderParameterGroup
     implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl
index 485f58d..717858e 100644 (file)
@@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
 import org.onap.policy.clamp.controlloop.runtime.util.CommonTestData;
 import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -56,14 +57,15 @@ class SupervisionScannerTest {
         var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
         var participantProvider = mock(ParticipantProvider.class);
         var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
+        var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
         var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
 
         var controlLoop = new ControlLoop();
         when(controlLoopProvider.getControlLoops(null, null)).thenReturn(List.of(controlLoop));
 
-        var supervisionScanner =
-                new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher,
-                        participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup);
+        var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
+                controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
+                participantUpdatePublisher, clRuntimeParameterGroup);
         supervisionScanner.run(false);
 
         verify(controlLoopProvider, times(0)).updateControlLoop(any(ControlLoop.class));
@@ -82,11 +84,12 @@ class SupervisionScannerTest {
         var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
         var participantProvider = mock(ParticipantProvider.class);
         var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
+        var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
         var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
 
-        var supervisionScanner =
-                new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher,
-                        participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup);
+        var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
+                controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
+                participantUpdatePublisher, clRuntimeParameterGroup);
         supervisionScanner.run(false);
 
         verify(controlLoopProvider, times(1)).updateControlLoop(any(ControlLoop.class));
@@ -107,11 +110,12 @@ class SupervisionScannerTest {
         var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
         var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
         var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
+        var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
         var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
 
-        var supervisionScanner =
-                new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher,
-                        participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup);
+        var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
+                controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
+                participantUpdatePublisher, clRuntimeParameterGroup);
 
         supervisionScanner.handleParticipantStatus(participant.getKey().asIdentifier());
         supervisionScanner.run(true);
@@ -126,7 +130,7 @@ class SupervisionScannerTest {
         when(controlLoopProvider.getControlLoops(null, null)).thenReturn(List.of(controlLoop));
 
         var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant");
-        clRuntimeParameterGroup.getParticipantParameters().setMaxMessageAgeMs(0);
+        clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().setMaxWaitMs(0);
 
         var participant = new Participant();
         participant.setName("Participant0");
@@ -140,10 +144,11 @@ class SupervisionScannerTest {
         var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
         var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
         var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
+        var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
 
-        var supervisionScanner =
-                new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher,
-                        participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup);
+        var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
+                controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
+                participantUpdatePublisher, clRuntimeParameterGroup);
 
         supervisionScanner.handleParticipantStatus(participant.getKey().asIdentifier());
         supervisionScanner.run(true);