Add Retry and Timeout handling 02/122802/6
authorFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 20 Jul 2021 14:49:57 +0000 (15:49 +0100)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Mon, 26 Jul 2021 13:55:31 +0000 (14:55 +0100)
Implementaton of Supervision, Retry and Timeout handling
on all Participant messages

Issue-ID: POLICY-3455
Change-Id: Idfd53ea0b8f5bb1272703256b983a6cbeeb4fdf4
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
17 files changed:
runtime-controlloop/pom.xml
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
runtime-controlloop/src/main/resources/application.yaml
runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProviderTest.java
runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java

index e436451..508f44a 100644 (file)
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-aop</artifactId>
+            <version>${version.springboot}</version>
+        </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-security</artifactId>
index 28814b3..5fbd36c 100644 (file)
@@ -24,7 +24,9 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
 import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
+@EnableScheduling
 @SpringBootApplication
 @ComponentScan({"org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider",
     "org.onap.policy.clamp.controlloop.runtime"})
index 74548e7..67c615d 100644 (file)
@@ -62,6 +62,8 @@ public class CommissioningController extends AbstractRestController {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CommissioningController.class);
 
+    private static final String TAGS = "Clamp Control Loop Commissioning API";
+
     private final CommissioningProvider provider;
 
     /**
@@ -88,7 +90,7 @@ public class CommissioningController extends AbstractRestController {
         value = "Commissions control loop definitions",
         notes = "Commissions control loop definitions, returning the commissioned control loop definition IDs",
         response = CommissioningResponse.class,
-        tags = {"Control Loop Commissioning API"},
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
@@ -158,7 +160,7 @@ public class CommissioningController extends AbstractRestController {
     @ApiOperation(value = "Delete a commissioned control loop",
         notes = "Deletes a Commissioned Control Loop, returning optional error details",
         response = CommissioningResponse.class,
-        tags = {"Clamp Control Loop Commissioning API"},
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
@@ -233,7 +235,7 @@ public class CommissioningController extends AbstractRestController {
         notes = "Queries details of the requested commissioned control loop definitions, "
             + "returning all control loop details",
         response = ToscaNodeTemplate.class,
-        tags = {"Clamp Control Loop Commissioning API"},
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
@@ -302,7 +304,7 @@ public class CommissioningController extends AbstractRestController {
         notes = "Queries details of the requested commissioned tosca service template, "
             + "returning all tosca service template details",
         response = ToscaServiceTemplate.class,
-        tags = {"Clamp Control Loop Commissioning API"},
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
@@ -379,7 +381,7 @@ public class CommissioningController extends AbstractRestController {
         notes = "Queries details of the requested commissioned tosca service template json schema, "
             + "returning all tosca service template json schema details",
         response = ToscaServiceTemplate.class,
-        tags = {"Clamp Control Loop Commissioning API"},
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
@@ -448,7 +450,7 @@ public class CommissioningController extends AbstractRestController {
         notes = "Queries details of the requested commissioned control loop element definitions, "
             + "returning all control loop elements' details",
         response = ToscaNodeTemplate.class,
-        tags = {"Clamp Control Loop Commissioning API"},
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
index aba585e..5a320e8 100644 (file)
@@ -59,6 +59,8 @@ public class InstantiationController extends AbstractRestController {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InstantiationController.class);
 
+    private static final String TAGS = "Clamp Control Loop Instantiation API";
+
     // The CL provider for instantiation requests
     private final ControlLoopInstantiationProvider provider;
 
@@ -86,7 +88,7 @@ public class InstantiationController extends AbstractRestController {
             value = "Commissions control loop definitions",
             notes = "Commissions control loop definitions, returning the control loop IDs",
             response = InstantiationResponse.class,
-            tags = {"Control Loop Instantiation API"},
+            tags = {TAGS},
             authorizations = @Authorization(value = AUTHORIZATION_TYPE),
             responseHeaders = {
                 @ResponseHeader(
@@ -154,9 +156,7 @@ public class InstantiationController extends AbstractRestController {
     @ApiOperation(value = "Query details of the requested control loops",
             notes = "Queries details of the requested control loops, returning all control loop details",
             response = ControlLoops.class,
-            tags = {
-                "Clamp control loop Instantiation API"
-            },
+            tags = {TAGS},
             authorizations = @Authorization(value = AUTHORIZATION_TYPE),
             responseHeaders = {
                 @ResponseHeader(
@@ -220,9 +220,7 @@ public class InstantiationController extends AbstractRestController {
             value = "Updates control loop definitions",
             notes = "Updates control loop definitions, returning the updated control loop definition IDs",
             response = InstantiationResponse.class,
-            tags = {
-                "Control Loop Instantiation API"
-                },
+            tags = {TAGS},
             authorizations = @Authorization(value = AUTHORIZATION_TYPE),
             responseHeaders = {
                 @ResponseHeader(
@@ -290,9 +288,7 @@ public class InstantiationController extends AbstractRestController {
     @ApiOperation(value = "Delete a control loop",
             notes = "Deletes a control loop, returning optional error details",
             response = InstantiationResponse.class,
-            tags = {
-                "Clamp Control Loop Instantiation API"
-                },
+            tags = {TAGS},
             authorizations = @Authorization(value = AUTHORIZATION_TYPE),
             responseHeaders = {
                 @ResponseHeader(
@@ -362,9 +358,7 @@ public class InstantiationController extends AbstractRestController {
     @ApiOperation(value = "Issue a command to the requested control loops",
             notes = "Issues a command to a control loop, ordering a state change on the control loop",
             response = InstantiationResponse.class,
-            tags = {
-                "Clamp Control Loop Instantiation API"
-            },
+            tags = {TAGS},
             authorizations = @Authorization(value = AUTHORIZATION_TYPE),
             responseHeaders = {
                 @ResponseHeader(
index 8653159..7ac9500 100644 (file)
@@ -52,6 +52,7 @@ import org.springframework.web.bind.annotation.RestController;
 public class MonitoringQueryController extends AbstractRestController {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MonitoringQueryController.class);
+    private static final String TAGS = "Clamp Control Loop Monitoring API";
     private final MonitoringProvider provider;
 
     /**
@@ -80,9 +81,7 @@ public class MonitoringQueryController extends AbstractRestController {
     @ApiOperation(value = "Query details of the requested participant stats",
         notes = "Queries details of the requested participant stats, returning all participant stats",
         response = ParticipantStatisticsList.class,
-        tags = {
-            "Clamp control loop Monitoring API"
-        },
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
@@ -168,9 +167,7 @@ public class MonitoringQueryController extends AbstractRestController {
     @ApiOperation(value = "Query details of all the participant stats in a control loop",
         notes = "Queries details of the participant stats, returning all participant stats",
         response = ClElementStatisticsList.class,
-        tags = {
-            "Clamp control loop Monitoring API"
-        },
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
@@ -235,9 +232,7 @@ public class MonitoringQueryController extends AbstractRestController {
     @ApiOperation(value = "Query details of the requested cl element stats in a control loop",
         notes = "Queries details of the requested cl element stats, returning all clElement stats",
         response = ClElementStatisticsList.class,
-        tags = {
-            "Clamp control loop Monitoring API"
-        },
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
@@ -306,9 +301,7 @@ public class MonitoringQueryController extends AbstractRestController {
     @ApiOperation(value = "Query details of the requested cl element stats",
         notes = "Queries details of the requested cl element stats, returning all clElement stats",
         response = ClElementStatisticsList.class,
-        tags = {
-            "Clamp control loop Monitoring API"
-        },
+        tags = {TAGS},
         authorizations = @Authorization(value = AUTHORIZATION_TYPE),
         responseHeaders = {
             @ResponseHeader(
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java
new file mode 100644 (file)
index 0000000..c23ed83
--- /dev/null
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface MessageIntercept {
+
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
new file mode 100644 (file)
index 0000000..293b5d5
--- /dev/null
@@ -0,0 +1,71 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import org.aspectj.lang.annotation.After;
+import org.aspectj.lang.annotation.Aspect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Aspect
+@Component
+@RequiredArgsConstructor
+public class SupervisionAspect implements Closeable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class);
+
+    private final SupervisionScanner supervisionScanner;
+
+    private ThreadPoolExecutor executor =
+            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+
+    @Scheduled(
+            fixedRateString = "${runtime.participantParameters.heartBeatMs}",
+            initialDelayString = "${runtime.participantParameters.heartBeatMs}")
+    public void schedule() {
+        LOGGER.info("Add scheduled scanning");
+        executor.execute(() -> supervisionScanner.run(true));
+    }
+
+    /**
+     * Intercept Messages from participant and run Supervision Scan.
+     */
+    @After("@annotation(MessageIntercept)")
+    public void doCheck() {
+        if (executor.getQueue().size() < 2) {
+            LOGGER.debug("Add scanning Message");
+            executor.execute(() -> supervisionScanner.run(false));
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        executor.shutdown();
+    }
+}
index 56a1ba9..dadfe0d 100644 (file)
 
 package org.onap.policy.clamp.controlloop.runtime.supervision;
 
-import java.time.Instant;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.UUID;
 import javax.ws.rs.core.Response;
 import lombok.AllArgsConstructor;
 import org.apache.commons.collections4.CollectionUtils;
 import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
 import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
 import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
-import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
-import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
 import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher;
-import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
-import org.onap.policy.common.utils.services.ServiceManager;
-import org.onap.policy.common.utils.services.ServiceManagerException;
 import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.slf4j.Logger;
@@ -84,7 +66,6 @@ public class SupervisionHandler {
     private final ControlLoopProvider controlLoopProvider;
     private final ParticipantProvider participantProvider;
     private final MonitoringProvider monitoringProvider;
-    private final CommissioningProvider commissioningProvider;
 
     // Publishers for participant communication
     private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
@@ -130,6 +111,7 @@ public class SupervisionHandler {
      *
      * @param participantStatusMessage the ParticipantStatus message received from a participant
      */
+    @MessageIntercept
     public void handleParticipantMessage(ParticipantStatus participantStatusMessage) {
         LOGGER.debug("Participant Status received {}", participantStatusMessage);
         try {
@@ -151,10 +133,14 @@ public class SupervisionHandler {
      *
      * @param participantRegisterMessage the ParticipantRegister message received from a participant
      */
+    @MessageIntercept
     public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
         LOGGER.debug("Participant Register received {}", participantRegisterMessage);
-        sendParticipantAckMessage(participantRegisterMessage);
-        sendParticipantUpdate(participantRegisterMessage);
+
+        participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId());
+
+        participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(),
+                participantRegisterMessage.getParticipantType());
     }
 
     /**
@@ -162,9 +148,10 @@ public class SupervisionHandler {
      *
      * @param participantDeregisterMessage the ParticipantDeregister message received from a participant
      */
+    @MessageIntercept
     public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) {
         LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage);
-        sendParticipantAckMessage(participantDeregisterMessage);
+        participantDeregisterAckPublisher.send(participantDeregisterMessage.getMessageId());
     }
 
     /**
@@ -172,6 +159,7 @@ public class SupervisionHandler {
      *
      * @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant
      */
+    @MessageIntercept
     public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) {
         LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage);
     }
@@ -289,70 +277,6 @@ public class SupervisionHandler {
         }
     }
 
-    private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException {
-        var controlLoopUpdateMsg = new ControlLoopUpdate();
-        controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier());
-        controlLoopUpdateMsg.setControlLoop(controlLoop);
-        // TODO: We should look up the correct TOSCA node template here for the control loop
-        // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
-        controlLoopUpdateMsg.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
-        controlLoopUpdatePublisher.send(controlLoopUpdateMsg);
-    }
-
-    private void sendControlLoopStateChange(ControlLoop controlLoop) {
-        var clsc = new ControlLoopStateChange();
-        clsc.setControlLoopId(controlLoop.getKey().asIdentifier());
-        clsc.setMessageId(UUID.randomUUID());
-        clsc.setOrderedState(controlLoop.getOrderedState());
-        controlLoopStateChangePublisher.send(clsc);
-    }
-
-    private void sendParticipantUpdate(ParticipantRegister participantRegisterMessage) {
-        var message = new ParticipantUpdate();
-        message.setParticipantId(participantRegisterMessage.getParticipantId());
-        message.setParticipantType(participantRegisterMessage.getParticipantType());
-        message.setTimestamp(Instant.now());
-
-        ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition();
-        clDefinition.setId(UUID.randomUUID());
-
-        try {
-            clDefinition.setControlLoopElementToscaServiceTemplate(commissioningProvider
-                    .getToscaServiceTemplate(null, null));
-        } catch (PfModelException pfme) {
-            LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme);
-            return;
-        }
-
-        Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>();
-        controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition);
-
-        Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>>
-            participantDefinitionUpdateMap = new LinkedHashMap<>();
-        participantDefinitionUpdateMap.put(participantRegisterMessage.getParticipantId(),
-                      controlLoopElementDefinitionMap);
-        message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
-
-        LOGGER.debug("Participant Update sent", message);
-        participantUpdatePublisher.send(message);
-    }
-
-    private void sendParticipantAckMessage(ParticipantRegister participantRegisterMessage) {
-        var message = new ParticipantRegisterAck();
-        message.setResponseTo(participantRegisterMessage.getMessageId());
-        message.setMessage("Participant Register Ack");
-        message.setResult(true);
-        participantRegisterAckPublisher.send(message);
-    }
-
-    private void sendParticipantAckMessage(ParticipantDeregister participantDeregisterMessage) {
-        var message = new ParticipantDeregisterAck();
-        message.setResponseTo(participantDeregisterMessage.getMessageId());
-        message.setMessage("Participant Deregister Ack");
-        message.setResult(true);
-        participantDeregisterAckPublisher.send(message);
-    }
-
     private void superviseParticipant(ParticipantStatus participantStatusMessage)
             throws PfModelException, ControlLoopException {
         if (participantStatusMessage.getParticipantId() == null) {
index 68f5830..b360f67 100644 (file)
 
 package org.onap.policy.clamp.controlloop.runtime.supervision;
 
-import java.io.Closeable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
 import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
 import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
 import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -37,35 +43,83 @@ import org.springframework.stereotype.Component;
  * This class is used to scan the control loops in the database and check if they are in the correct state.
  */
 @Component
-public class SupervisionScanner implements Runnable, Closeable {
+public class SupervisionScanner {
     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
 
-    private ControlLoopProvider controlLoopProvider;
-    private ScheduledExecutorService timerPool;
+    @Getter
+    @Setter
+    static class HandleCounter {
+        private int maxRetryCount;
+        private long maxWaitMs;
+        private Map<ToscaConceptIdentifier, Integer> mapCounter = new HashMap<>();
+        private Set<ToscaConceptIdentifier> mapFault = new HashSet<>();
+
+        public void clear(ToscaConceptIdentifier id) {
+            mapCounter.put(id, 0);
+            mapFault.remove(id);
+        }
+
+        public void setFault(ToscaConceptIdentifier id) {
+            mapCounter.put(id, 0);
+            mapFault.add(id);
+        }
+
+        public boolean count(ToscaConceptIdentifier id) {
+            int counter = mapCounter.getOrDefault(id, 0) + 1;
+            if (counter <= maxRetryCount) {
+                mapCounter.put(id, counter);
+                return true;
+            }
+            return false;
+        }
+
+        public boolean isFault(ToscaConceptIdentifier id) {
+            return mapFault.contains(id);
+        }
+
+        public int getCounter(ToscaConceptIdentifier id) {
+            return mapCounter.getOrDefault(id, 0);
+        }
+    }
+
+    private HandleCounter stateChange = new HandleCounter();
+
+    private final ControlLoopProvider controlLoopProvider;
+    private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
+    private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
 
     /**
      * Constructor for instantiating SupervisionScanner.
      *
-     * @param clRuntimeParameterGroup the parameters for the control loop runtime
      * @param controlLoopProvider the provider to use to read control loops from the database
+     * @param controlLoopStateChangePublisher the ControlLoopStateChange Publisher
+     * @param clRuntimeParameterGroup the parameters for the control loop runtime
      */
     public SupervisionScanner(final ControlLoopProvider controlLoopProvider,
-            ClRuntimeParameterGroup clRuntimeParameterGroup) {
+            final ControlLoopStateChangePublisher controlLoopStateChangePublisher,
+            ControlLoopUpdatePublisher controlLoopUpdatePublisher,
+            final ClRuntimeParameterGroup clRuntimeParameterGroup) {
         this.controlLoopProvider = controlLoopProvider;
+        this.controlLoopStateChangePublisher = controlLoopStateChangePublisher;
+        this.controlLoopUpdatePublisher = controlLoopUpdatePublisher;
 
-        // Kick off the timer
-        timerPool = makeTimerPool();
-        timerPool.scheduleAtFixedRate(this, 0, clRuntimeParameterGroup.getSupervisionScannerIntervalSec(),
-                TimeUnit.SECONDS);
+        stateChange.setMaxRetryCount(
+                clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
+        stateChange.setMaxWaitMs(
+                clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
     }
 
-    @Override
-    public void run() {
+    /**
+     * Run Scanning.
+     *
+     * @param counterCheck if true activate counter and retry
+     */
+    public void run(boolean counterCheck) {
         LOGGER.debug("Scanning control loops in the database . . .");
 
         try {
             for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) {
-                scanControlLoop(controlLoop);
+                scanControlLoop(controlLoop, counterCheck);
             }
         } catch (PfModelException pfme) {
             LOGGER.warn("error reading control loops from database", pfme);
@@ -74,40 +128,65 @@ public class SupervisionScanner implements Runnable, Closeable {
         LOGGER.debug("Control loop scan complete . . .");
     }
 
-    @Override
-    public void close() {
-        timerPool.shutdown();
-    }
-
-    private void scanControlLoop(final ControlLoop controlLoop) throws PfModelException {
+    private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException {
         LOGGER.debug("scanning control loop {} . . .", controlLoop.getKey().asIdentifier());
 
         if (controlLoop.getState().equals(controlLoop.getOrderedState().asState())) {
             LOGGER.debug("control loop {} scanned, OK", controlLoop.getKey().asIdentifier());
+
+            // Clear missed report counter on Control Loop
+            clearFaultAndCounter(controlLoop);
             return;
         }
 
+        boolean completed = true;
         for (ControlLoopElement element : controlLoop.getElements().values()) {
             if (!element.getState().equals(element.getOrderedState().asState())) {
-                LOGGER.debug("control loop scan: transitioning from state {} to {}", controlLoop.getState(),
-                        controlLoop.getOrderedState());
-                return;
+                completed = false;
+                break;
             }
         }
 
-        LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(),
-                controlLoop.getOrderedState());
+        if (completed) {
+            LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(),
+                    controlLoop.getOrderedState());
+
+            controlLoop.setState(controlLoop.getOrderedState().asState());
+            controlLoopProvider.updateControlLoop(controlLoop);
 
-        controlLoop.setState(controlLoop.getOrderedState().asState());
-        controlLoopProvider.updateControlLoop(controlLoop);
+            // Clear missed report counter on Control Loop
+            clearFaultAndCounter(controlLoop);
+        } else {
+            LOGGER.debug("control loop scan: transition from state {} to {} not completed", controlLoop.getState(),
+                    controlLoop.getOrderedState());
+            if (counterCheck) {
+                handleCounter(controlLoop);
+            }
+        }
     }
 
-    /**
-     * Makes a new timer pool.
-     *
-     * @return a new timer pool
-     */
-    protected ScheduledExecutorService makeTimerPool() {
-        return Executors.newScheduledThreadPool(1);
+    private void clearFaultAndCounter(ControlLoop controlLoop) {
+        stateChange.clear(controlLoop.getKey().asIdentifier());
+    }
+
+    private void handleCounter(ControlLoop controlLoop) {
+        ToscaConceptIdentifier id = controlLoop.getKey().asIdentifier();
+        if (stateChange.isFault(id)) {
+            LOGGER.debug("report ControlLoop fault");
+            return;
+        }
+
+        if (stateChange.count(id)) {
+            if (ControlLoopState.UNINITIALISED2PASSIVE.equals(controlLoop.getState())) {
+                LOGGER.debug("retry message ControlLoopUpdate");
+                controlLoopUpdatePublisher.send(controlLoop);
+            } else {
+                LOGGER.debug("retry message ControlLoopStateChange");
+                controlLoopStateChangePublisher.send(controlLoop);
+            }
+        } else {
+            LOGGER.debug("report ControlLoop fault");
+            stateChange.setFault(id);
+        }
     }
 }
index e562343..e366ba4 100644 (file)
 
 package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
 
+import lombok.AllArgsConstructor;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
 import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
 import org.onap.policy.models.base.PfModelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 /**
  * This class is used to send ControlLoopUpdate messages to participants on DMaaP.
  */
 @Component
+@AllArgsConstructor
 public class ControlLoopUpdatePublisher extends AbstractParticipantPublisher<ControlLoopUpdate> {
 
-    private final CommissioningProvider commissioningProvider;
+    private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdatePublisher.class);
 
-    /**
-     * Constructor.
-     *
-     * @param commissioningProvider the CommissioningProvider
-     */
-    public ControlLoopUpdatePublisher(CommissioningProvider commissioningProvider) {
-        this.commissioningProvider = commissioningProvider;
-    }
+    private final CommissioningProvider commissioningProvider;
 
     /**
      * Send ControlLoopUpdate to Participant.
      *
      * @param controlLoop the ControlLoop
-     * @throws PfModelException on errors getting the Control Loop Definition
      */
-    public void send(ControlLoop controlLoop) throws PfModelException {
-        var pclu = new ControlLoopUpdate();
-        pclu.setControlLoopId(controlLoop.getKey().asIdentifier());
-        pclu.setControlLoop(controlLoop);
+    public void send(ControlLoop controlLoop) {
+        var controlLoopUpdateMsg = new ControlLoopUpdate();
+        controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier());
+        controlLoopUpdateMsg.setControlLoop(controlLoop);
         // TODO: We should look up the correct TOSCA node template here for the control loop
         // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
-        pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
-        super.send(pclu);
+        try {
+            controlLoopUpdateMsg.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
+        } catch (PfModelException pfme) {
+            LOGGER.warn("Get of tosca service template failed, cannot send ParticipantControlLoopUpdate", pfme);
+            return;
+        }
+        super.send(controlLoopUpdateMsg);
     }
 }
index c0fcb3e..e92b6ee 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
 
+import java.util.UUID;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
 import org.springframework.stereotype.Component;
 
@@ -29,4 +30,16 @@ import org.springframework.stereotype.Component;
 @Component
 public class ParticipantDeregisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantDeregisterAck> {
 
+    /**
+     * Sent ParticipantDeregisterAck to Participant.
+     *
+     * @param responseTo the original request id in the request.
+     */
+    public void send(UUID responseTo) {
+        var message = new ParticipantDeregisterAck();
+        message.setResponseTo(responseTo);
+        message.setMessage("Participant Deregister Ack");
+        message.setResult(true);
+        super.send(message);
+    }
 }
index 2c0c4b3..73860b5 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
 
+import java.util.UUID;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
 import org.springframework.stereotype.Component;
 
@@ -29,4 +30,16 @@ import org.springframework.stereotype.Component;
 @Component
 public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantRegisterAck> {
 
+    /**
+     * Send ParticipantRegisterAck to Participant.
+     *
+     * @param responseTo the original request id in the request.
+     */
+    public void send(UUID responseTo) {
+        var message = new ParticipantRegisterAck();
+        message.setResponseTo(responseTo);
+        message.setMessage("Participant Register Ack");
+        message.setResult(true);
+        super.send(message);
+    }
 }
index b8538b1..4eeb0a8 100644 (file)
@@ -21,7 +21,6 @@
 package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
 
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
 import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
 import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
index 5af5f1f..88cf90d 100644 (file)
 
 package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
 
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import lombok.AllArgsConstructor;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
+import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 /**
  * This class is used to send ParticipantUpdate messages to participants on DMaaP.
  */
 @Component
+@AllArgsConstructor
 public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<ParticipantUpdate> {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdatePublisher.class);
+
+    private final CommissioningProvider commissioningProvider;
+
+    /**
+     * Send ParticipantUpdate to Participant.
+     *
+     * @param participantId the participant Id
+     * @param participantType the participant Type
+     */
+    public void send(ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) {
+        var message = new ParticipantUpdate();
+        message.setParticipantId(participantId);
+        message.setParticipantType(participantType);
+        message.setTimestamp(Instant.now());
+
+        var clDefinition = new ControlLoopElementDefinition();
+        clDefinition.setId(UUID.randomUUID());
+
+        try {
+            clDefinition.setControlLoopElementToscaServiceTemplate(
+                    commissioningProvider.getToscaServiceTemplate(null, null));
+        } catch (PfModelException pfme) {
+            LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme);
+            return;
+        }
+
+        Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>();
+        controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition);
+
+        Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>> participantDefinitionUpdateMap =
+                new LinkedHashMap<>();
+        participantDefinitionUpdateMap.put(participantId, controlLoopElementDefinitionMap);
+        message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
+
+        LOGGER.debug("Participant Update sent {}", message);
+        super.send(message);
+    }
 }
index 01466ab..1d36b67 100644 (file)
@@ -27,9 +27,6 @@ runtime:
     updateParameters:
       maxRetryCount: 1
       maxWaitMs: 30000
-    stateChangeParameters:
-      maxRetryCount: 1
-      maxWaitMs: 30000
   databaseProviderParameters:
     name: PolicyProviderParameterGroup
     implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl
index 1c8d178..c414ffa 100644 (file)
@@ -119,7 +119,7 @@ class ControlLoopInstantiationProviderTest {
         var participantDeregisterAckPublisher = Mockito.mock(ParticipantDeregisterAckPublisher.class);
         var participantUpdatePublisher = Mockito.mock(ParticipantUpdatePublisher.class);
         supervisionHandler = new SupervisionHandler(clProvider, participantProvider, monitoringProvider,
-                        commissioningProvider, controlLoopUpdatePublisher, controlLoopStateChangePublisher,
+                        controlLoopUpdatePublisher, controlLoopStateChangePublisher,
                         participantRegisterAckPublisher, participantDeregisterAckPublisher, participantUpdatePublisher);
     }
 
index da242d7..5f00706 100644 (file)
@@ -96,7 +96,7 @@ class SupervisionMessagesTest extends CommonRestController {
         var participantDeregisterAckPublisher = Mockito.mock(ParticipantDeregisterAckPublisher.class);
         var participantUpdatePublisher = Mockito.mock(ParticipantUpdatePublisher.class);
         supervisionHandler = new SupervisionHandler(clProvider, participantProvider, monitoringProvider,
-                        commissioningProvider, controlLoopUpdatePublisher, controlLoopStateChangePublisher,
+                        controlLoopUpdatePublisher, controlLoopStateChangePublisher,
                         participantRegisterAckPublisher, participantDeregisterAckPublisher, participantUpdatePublisher);
     }
 
@@ -196,7 +196,7 @@ class SupervisionMessagesTest extends CommonRestController {
         participantUpdateMsg.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
 
         synchronized (lockit) {
-            ParticipantUpdatePublisher clUpdatePublisher = new ParticipantUpdatePublisher();
+            ParticipantUpdatePublisher clUpdatePublisher = new ParticipantUpdatePublisher(commissioningProvider);
             clUpdatePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
             clUpdatePublisher.send(participantUpdateMsg);
         }