Add support for new messages in controloop runtime 59/122659/4
authorSirisha_Manchikanti <sirisha.manchikanti@est.tech>
Wed, 14 Jul 2021 22:48:53 +0000 (23:48 +0100)
committerSirisha_Manchikanti <sirisha.manchikanti@est.tech>
Mon, 19 Jul 2021 16:33:12 +0000 (17:33 +0100)
Receive ParticipantRegister, ParticipantDeregister and
ParticipantUpdateAck messages from participants,
Send ParticipantRegisterAck, ParticipantDeregisterAck and
ParticipantUpdate from controlloop runtime to participants.

Issue-ID: POLICY-3414
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: Ib5cb7d582974e34e8a226f640747c596ac5b5beb

14 files changed:
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java [new file with mode: 0644]
runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProviderTest.java
runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java [new file with mode: 0644]
runtime-controlloop/src/test/resources/parameters/InstantiationConfigParametersStd.json
runtime-controlloop/src/test/resources/parameters/Unreadable.json

index d1fa312..433eeee 100644 (file)
@@ -45,6 +45,9 @@ public class ClRuntimeParameterGroup extends ParameterGroupImpl {
     private long participantStateChangeIntervalSec;
     private long participantClUpdateIntervalSec;
     private long participantClStateChangeIntervalSec;
+    private long participantRegisterAckIntervalSec;
+    private long participantDeregisterAckIntervalSec;
+    private long participantUpdateIntervalSec;
 
     /**
      * Create the Control Loop parameter group.
index aba5457..5e94d29 100644 (file)
 
 package org.onap.policy.clamp.controlloop.runtime.supervision;
 
+import java.time.Instant;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import javax.ws.rs.core.Response;
 import lombok.AllArgsConstructor;
 import org.apache.commons.collections4.CollectionUtils;
 import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
 import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
 import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
+import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
 import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopUpdatePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStateChangePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.utils.services.ServiceManager;
+import org.onap.policy.common.utils.services.ServiceManagerException;
 import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.slf4j.Logger;
@@ -60,10 +85,14 @@ public class SupervisionHandler {
     private final ControlLoopProvider controlLoopProvider;
     private final ParticipantProvider participantProvider;
     private final MonitoringProvider monitoringProvider;
+    private final CommissioningProvider commissioningProvider;
 
     // Publishers for participant communication
     private final ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher;
     private final ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher;
+    private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
+    private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
+    private final ParticipantUpdatePublisher participantUpdatePublisher;
 
     /**
      * Supervision trigger called when a command is issued on control loops.
@@ -102,9 +131,8 @@ public class SupervisionHandler {
      *
      * @param participantStatusMessage the ParticipantStatus message received from a participant
      */
-    public void handleParticipantStatusMessage(ParticipantStatus participantStatusMessage) {
+    public void handleParticipantMessage(ParticipantStatus participantStatusMessage) {
         LOGGER.debug("Participant Status received {}", participantStatusMessage);
-
         try {
             superviseParticipant(participantStatusMessage);
         } catch (PfModelException | ControlLoopException svExc) {
@@ -119,6 +147,36 @@ public class SupervisionHandler {
         }
     }
 
+    /**
+     * Handle a ParticipantRegister message from a participant.
+     *
+     * @param participantRegisterMessage the ParticipantRegister message received from a participant
+     */
+    public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
+        LOGGER.debug("Participant Register received {}", participantRegisterMessage);
+        sendParticipantAckMessage(participantRegisterMessage);
+        sendParticipantUpdate(participantRegisterMessage);
+    }
+
+    /**
+     * Handle a ParticipantDeregister message from a participant.
+     *
+     * @param participantDeregisterMessage the ParticipantDeregister message received from a participant
+     */
+    public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) {
+        LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage);
+        sendParticipantAckMessage(participantDeregisterMessage);
+    }
+
+    /**
+     * Handle a ParticipantUpdateAck message from a participant.
+     *
+     * @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant
+     */
+    public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) {
+        LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage);
+    }
+
     /**
      * Supervise a control loop, performing whatever actions need to be performed on the control loop.
      *
@@ -232,6 +290,70 @@ public class SupervisionHandler {
         }
     }
 
+    private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException {
+        var pclu = new ParticipantControlLoopUpdate();
+        pclu.setControlLoopId(controlLoop.getKey().asIdentifier());
+        pclu.setControlLoop(controlLoop);
+        // TODO: We should look up the correct TOSCA node template here for the control loop
+        // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
+        pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
+        controlLoopUpdatePublisher.send(pclu);
+    }
+
+    private void sendControlLoopStateChange(ControlLoop controlLoop) {
+        var clsc = new ParticipantControlLoopStateChange();
+        clsc.setControlLoopId(controlLoop.getKey().asIdentifier());
+        clsc.setMessageId(UUID.randomUUID());
+        clsc.setOrderedState(controlLoop.getOrderedState());
+        controlLoopStateChangePublisher.send(clsc);
+    }
+
+    private void sendParticipantUpdate(ParticipantRegister participantRegisterMessage) {
+        var message = new ParticipantUpdate();
+        message.setParticipantId(participantRegisterMessage.getParticipantId());
+        message.setParticipantType(participantRegisterMessage.getParticipantType());
+        message.setTimestamp(Instant.now());
+
+        ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition();
+        clDefinition.setId(UUID.randomUUID());
+
+        try {
+            clDefinition.setControlLoopElementToscaServiceTemplate(commissioningProvider
+                    .getToscaServiceTemplate(null, null));
+        } catch (PfModelException pfme) {
+            LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme);
+            return;
+        }
+
+        Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>();
+        controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition);
+
+        Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>>
+            participantDefinitionUpdateMap = new LinkedHashMap<>();
+        participantDefinitionUpdateMap.put(participantRegisterMessage.getParticipantId(),
+                      controlLoopElementDefinitionMap);
+        message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
+
+        LOGGER.debug("Participant Update sent", message);
+        participantUpdatePublisher.send(message);
+    }
+
+    private void sendParticipantAckMessage(ParticipantRegister participantRegisterMessage) {
+        var message = new ParticipantRegisterAck();
+        message.setResponseTo(participantRegisterMessage.getMessageId());
+        message.setMessage("Participant Register Ack");
+        message.setResult(true);
+        participantRegisterAckPublisher.send(message);
+    }
+
+    private void sendParticipantAckMessage(ParticipantDeregister participantDeregisterMessage) {
+        var message = new ParticipantDeregisterAck();
+        message.setResponseTo(participantDeregisterMessage.getMessageId());
+        message.setMessage("Participant Deregister Ack");
+        message.setResult(true);
+        participantDeregisterAckPublisher.send(message);
+    }
+
     private void superviseParticipant(ParticipantStatus participantStatusMessage)
             throws PfModelException, ControlLoopException {
         if (participantStatusMessage.getParticipantId() == null) {
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java
new file mode 100644 (file)
index 0000000..4b4ca99
--- /dev/null
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import java.util.List;
+import javax.ws.rs.core.Response.Status;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
+import org.onap.policy.clamp.controlloop.runtime.config.messaging.Publisher;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+
+public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMessage> implements Publisher {
+
+    private TopicSinkClient topicSinkClient;
+    private boolean active = false;
+
+    /**
+     * Method to send Participant message to participants on demand.
+     *
+     * @param participantMessage the Participant message
+     */
+    public void send(final E participantMessage) {
+        if (!active) {
+            throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+        }
+        topicSinkClient.send(participantMessage);
+    }
+
+
+    @Override
+    public void active(List<TopicSink> topicSinks) {
+        if (topicSinks.size() != 1) {
+            throw new IllegalArgumentException("Topic Sink must be one");
+        }
+        this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+        active = true;
+    }
+
+    @Override
+    public void stop() {
+        active = false;
+    }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
new file mode 100644 (file)
index 0000000..c0fcb3e
--- /dev/null
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class is used to send ParticipantDeregisterAck messages to participants on DMaaP.
+ */
+@Component
+public class ParticipantDeregisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantDeregisterAck> {
+
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java
new file mode 100644 (file)
index 0000000..a03ff0a
--- /dev/null
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * Listener for ParticipantDeregister messages sent by participants.
+ */
+@Component
+public class ParticipantDeregisterListener extends ScoListener<ParticipantDeregister> implements Listener {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantDeregisterListener.class);
+
+    private final SupervisionHandler supervisionHandler;
+
+    /**
+     * Constructs the object.
+     */
+    public ParticipantDeregisterListener(SupervisionHandler supervisionHandler) {
+        super(ParticipantDeregister.class);
+        this.supervisionHandler = supervisionHandler;
+    }
+
+    @Override
+    public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
+            final ParticipantDeregister participantDeregisterMessage) {
+        LOGGER.debug("ParticipantDeregister message received from participant - {}", participantDeregisterMessage);
+        supervisionHandler.handleParticipantMessage(participantDeregisterMessage);
+    }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.PARTICIPANT_DEREGISTER.name();
+    }
+
+    @Override
+    public ScoListener<ParticipantDeregister> getScoListener() {
+        return this;
+    }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
new file mode 100644 (file)
index 0000000..2c0c4b3
--- /dev/null
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class is used to send ParticipantRegisterAck messages to participants on DMaaP.
+ */
+@Component
+public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantRegisterAck> {
+
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java
new file mode 100644 (file)
index 0000000..a4d8c76
--- /dev/null
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
+import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * Listener for ParticipantRegister messages sent by participants.
+ */
+@Component
+public class ParticipantRegisterListener extends ScoListener<ParticipantRegister> implements Listener {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantRegisterListener.class);
+
+    private final SupervisionHandler supervisionHandler;
+
+    /**
+     * Constructs the object.
+     */
+    public ParticipantRegisterListener(SupervisionHandler supervisionHandler) {
+        super(ParticipantRegister.class);
+        this.supervisionHandler = supervisionHandler;
+    }
+
+    @Override
+    public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
+            final ParticipantRegister participantRegisterMessage) {
+        LOGGER.debug("ParticipantRegister message received from participant - {}", participantRegisterMessage);
+        supervisionHandler.handleParticipantMessage(participantRegisterMessage);
+    }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.PARTICIPANT_REGISTER.name();
+    }
+
+    @Override
+    public ScoListener<ParticipantRegister> getScoListener() {
+        return this;
+    }
+}
index 8fa0762..9da8860 100644 (file)
@@ -52,7 +52,7 @@ public class ParticipantStatusListener extends ScoListener<ParticipantStatus> im
     public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
             final ParticipantStatus participantStatusMessage) {
         LOGGER.debug("ParticipantStatus message received from participant - {}", participantStatusMessage);
-        supervisionHandler.handleParticipantStatusMessage(participantStatusMessage);
+        supervisionHandler.handleParticipantMessage(participantStatusMessage);
     }
 
     @Override
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
new file mode 100644 (file)
index 0000000..b8538b1
--- /dev/null
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * Listener for ParticipantUpdateAck messages sent by participants.
+ */
+@Component
+public class ParticipantUpdateAckListener extends ScoListener<ParticipantUpdateAck> implements Listener {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdateAckListener.class);
+
+    private final SupervisionHandler supervisionHandler;
+
+    /**
+     * Constructs the object.
+     */
+    public ParticipantUpdateAckListener(SupervisionHandler supervisionHandler) {
+        super(ParticipantUpdateAck.class);
+        this.supervisionHandler = supervisionHandler;
+    }
+
+    @Override
+    public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
+            final ParticipantUpdateAck participantUpdateAckMessage) {
+        LOGGER.debug("ParticipantUpdateAck message received from participant - {}", participantUpdateAckMessage);
+        supervisionHandler.handleParticipantMessage(participantUpdateAckMessage);
+    }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.PARTICIPANT_UPDATE_ACK.name();
+    }
+
+    @Override
+    public ScoListener<ParticipantUpdateAck> getScoListener() {
+        return this;
+    }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
new file mode 100644 (file)
index 0000000..5af5f1f
--- /dev/null
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class is used to send ParticipantUpdate messages to participants on DMaaP.
+ */
+@Component
+public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<ParticipantUpdate> {
+
+}
index 1d7a00b..b92f341 100644 (file)
@@ -45,6 +45,9 @@ import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
 import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopUpdatePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
 import org.onap.policy.clamp.controlloop.runtime.util.CommonTestData;
 import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.provider.PolicyModelsProvider;
@@ -112,8 +115,12 @@ class ControlLoopInstantiationProviderTest {
         var participantProvider = new ParticipantProvider(controlLoopParameters.getDatabaseProviderParameters());
         var controlLoopUpdatePublisher = Mockito.mock(ParticipantControlLoopUpdatePublisher.class);
         var controlLoopStateChangePublisher = Mockito.mock(ParticipantControlLoopStateChangePublisher.class);
+        var participantRegisterAckPublisher = Mockito.mock(ParticipantRegisterAckPublisher.class);
+        var participantDeregisterAckPublisher = Mockito.mock(ParticipantDeregisterAckPublisher.class);
+        var participantUpdatePublisher = Mockito.mock(ParticipantUpdatePublisher.class);
         supervisionHandler = new SupervisionHandler(clProvider, participantProvider, monitoringProvider,
-                controlLoopUpdatePublisher, controlLoopStateChangePublisher);
+                        commissioningProvider, controlLoopUpdatePublisher, controlLoopStateChangePublisher,
+                        participantRegisterAckPublisher, participantDeregisterAckPublisher, participantUpdatePublisher);
     }
 
     @AfterAll
diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java
new file mode 100644 (file)
index 0000000..f08cda1
--- /dev/null
@@ -0,0 +1,228 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
+import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ClElementStatisticsProvider;
+import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
+import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider;
+import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantStatisticsProvider;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
+import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
+import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.clamp.controlloop.runtime.util.CommonTestData;
+import org.onap.policy.clamp.controlloop.runtime.util.rest.CommonRestController;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.utils.coder.YamlJsonTranslator;
+import org.onap.policy.common.utils.resources.ResourceUtils;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.provider.PolicyModelsProvider;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
+
+class SupervisionMessagesTest extends CommonRestController {
+
+    private static final String TOSCA_SERVICE_TEMPLATE_YAML =
+            "src/test/resources/rest/servicetemplates/pmsh_multiple_cl_tosca.yaml";
+    private static final Object lockit = new Object();
+    private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
+    private static final String TOPIC = "my-topic";
+    private static final long interval = 1000;
+    private static SupervisionHandler supervisionHandler;
+    private static CommissioningProvider commissioningProvider;
+    private static ControlLoopProvider clProvider;
+    private static PolicyModelsProvider modelsProvider;
+    private static final YamlJsonTranslator yamlTranslator = new YamlJsonTranslator();
+
+    /**
+     * setup Db Provider Parameters.
+     *
+     * @throws PfModelException if an error occurs
+     */
+    @BeforeAll
+    public static void setupDbProviderParameters() throws PfModelException {
+        ClRuntimeParameterGroup controlLoopParameters = CommonTestData.geParameterGroup(0, "instantproviderdb");
+
+        modelsProvider =
+                CommonTestData.getPolicyModelsProvider(controlLoopParameters.getDatabaseProviderParameters());
+        clProvider = new ControlLoopProvider(controlLoopParameters.getDatabaseProviderParameters());
+        var participantStatisticsProvider =
+                new ParticipantStatisticsProvider(controlLoopParameters.getDatabaseProviderParameters());
+        var clElementStatisticsProvider =
+                new ClElementStatisticsProvider(controlLoopParameters.getDatabaseProviderParameters());
+        commissioningProvider = new CommissioningProvider(modelsProvider, clProvider);
+        var monitoringProvider =
+                new MonitoringProvider(participantStatisticsProvider, clElementStatisticsProvider, clProvider);
+        var participantProvider = new ParticipantProvider(controlLoopParameters.getDatabaseProviderParameters());
+        var controlLoopUpdatePublisher = Mockito.mock(ParticipantControlLoopUpdatePublisher.class);
+        var controlLoopStateChangePublisher = Mockito.mock(ParticipantControlLoopStateChangePublisher.class);
+        var participantRegisterAckPublisher = Mockito.mock(ParticipantRegisterAckPublisher.class);
+        var participantDeregisterAckPublisher = Mockito.mock(ParticipantDeregisterAckPublisher.class);
+        var participantUpdatePublisher = Mockito.mock(ParticipantUpdatePublisher.class);
+        supervisionHandler = new SupervisionHandler(clProvider, participantProvider, monitoringProvider,
+                        commissioningProvider, controlLoopUpdatePublisher, controlLoopStateChangePublisher,
+                        participantRegisterAckPublisher, participantDeregisterAckPublisher, participantUpdatePublisher);
+    }
+
+    @AfterAll
+    public static void closeDbProvider() throws PfModelException {
+        clProvider.close();
+        modelsProvider.close();
+    }
+
+    @Test
+    void testReceiveParticipantRegister() throws Exception {
+        final ParticipantRegister participantRegisterMsg = new ParticipantRegister();
+        participantRegisterMsg.setParticipantId(getParticipantId());
+        participantRegisterMsg.setTimestamp(Instant.now());
+        participantRegisterMsg.setParticipantType(getParticipantType());
+
+        synchronized (lockit) {
+            ParticipantRegisterListener participantRegisterListener =
+                    new ParticipantRegisterListener(supervisionHandler);
+            ToscaServiceTemplate serviceTemplate = yamlTranslator
+                .fromYaml(ResourceUtils.getResourceAsString(TOSCA_SERVICE_TEMPLATE_YAML), ToscaServiceTemplate.class);
+
+            List<ToscaNodeTemplate> listOfTemplates = commissioningProvider.getControlLoopDefinitions(null, null);
+            commissioningProvider.createControlLoopDefinitions(serviceTemplate);
+            participantRegisterListener.onTopicEvent(INFRA, TOPIC, null, participantRegisterMsg);
+        }
+    }
+
+    @Test
+    void testSendParticipantRegisterAck() throws Exception {
+        final ParticipantRegisterAck participantRegisterAckMsg = new ParticipantRegisterAck();
+        participantRegisterAckMsg.setMessage("ParticipantRegisterAck message");
+        participantRegisterAckMsg.setResponseTo(UUID.randomUUID());
+        participantRegisterAckMsg.setResult(true);
+
+        synchronized (lockit) {
+            ParticipantRegisterAckPublisher clRegisterAckPublisher = new ParticipantRegisterAckPublisher();
+            clRegisterAckPublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
+            clRegisterAckPublisher.send(participantRegisterAckMsg);
+        }
+    }
+
+    @Test
+    void testReceiveParticipantDeregister() throws Exception {
+        final ParticipantDeregister participantDeregisterMsg = new ParticipantDeregister();
+        participantDeregisterMsg.setParticipantId(getParticipantId());
+        participantDeregisterMsg.setTimestamp(Instant.now());
+        participantDeregisterMsg.setParticipantType(getParticipantType());
+
+        synchronized (lockit) {
+            ParticipantDeregisterListener participantDeregisterListener =
+                    new ParticipantDeregisterListener(supervisionHandler);
+            participantDeregisterListener.onTopicEvent(INFRA, TOPIC, null, participantDeregisterMsg);
+        }
+    }
+
+    @Test
+    void testSendParticipantDeregisterAck() throws Exception {
+        final ParticipantDeregisterAck participantDeregisterAckMsg = new ParticipantDeregisterAck();
+        participantDeregisterAckMsg.setMessage("ParticipantDeregisterAck message");
+        participantDeregisterAckMsg.setResponseTo(UUID.randomUUID());
+        participantDeregisterAckMsg.setResult(true);
+
+        synchronized (lockit) {
+            ParticipantDeregisterAckPublisher clDeregisterAckPublisher = new ParticipantDeregisterAckPublisher();
+            clDeregisterAckPublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
+            clDeregisterAckPublisher.send(participantDeregisterAckMsg);
+        }
+    }
+
+    @Test
+    void testSendParticipantUpdate() throws Exception {
+        final ParticipantUpdate participantUpdateMsg = new ParticipantUpdate();
+        participantUpdateMsg.setParticipantId(getParticipantId());
+        participantUpdateMsg.setTimestamp(Instant.now());
+        participantUpdateMsg.setParticipantType(getParticipantType());
+        participantUpdateMsg.setTimestamp(Instant.ofEpochMilli(3000));
+        participantUpdateMsg.setMessageId(UUID.randomUUID());
+
+        ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate();
+        toscaServiceTemplate.setName("serviceTemplate");
+        toscaServiceTemplate.setDerivedFrom("parentServiceTemplate");
+        toscaServiceTemplate.setDescription("Description of serviceTemplate");
+        toscaServiceTemplate.setVersion("1.2.3");
+
+        ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition();
+        clDefinition.setId(UUID.randomUUID());
+        clDefinition.setControlLoopElementToscaServiceTemplate(toscaServiceTemplate);
+        Map<String, String> commonPropertiesMap = Map.of("Prop1", "PropValue");
+        clDefinition.setCommonPropertiesMap(commonPropertiesMap);
+
+        Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap =
+            Map.of(UUID.randomUUID(), clDefinition);
+
+        Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>>
+            participantDefinitionUpdateMap = Map.of(getParticipantId(), controlLoopElementDefinitionMap);
+        participantUpdateMsg.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
+
+        synchronized (lockit) {
+            ParticipantUpdatePublisher clUpdatePublisher = new ParticipantUpdatePublisher();
+            clUpdatePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
+            clUpdatePublisher.send(participantUpdateMsg);
+        }
+    }
+
+    @Test
+    void testReceiveParticipantUpdateAckMessage() throws Exception {
+        final ParticipantUpdateAck participantUpdateAckMsg = new ParticipantUpdateAck();
+        participantUpdateAckMsg.setMessage("ParticipantUpdateAck message");
+        participantUpdateAckMsg.setResponseTo(UUID.randomUUID());
+        participantUpdateAckMsg.setResult(true);
+
+        synchronized (lockit) {
+            ParticipantUpdateAckListener participantUpdateAckListener =
+                    new ParticipantUpdateAckListener(supervisionHandler);
+            participantUpdateAckListener.onTopicEvent(INFRA, TOPIC, null, participantUpdateAckMsg);
+        }
+    }
+
+    private ToscaConceptIdentifier getParticipantId() {
+        return new ToscaConceptIdentifier("org.onap.PM_Policy", "1.0.0");
+    }
+
+    private ToscaConceptIdentifier getParticipantType() {
+        return new ToscaConceptIdentifier("org.onap.policy.controlloop.PolicyControlLoopParticipant", "2.3.1");
+    }
+}
index 06f4370..71df254 100644 (file)
                     "localhost"
                 ],
                 "topicCommInfrastructure": "dmaap"
-            },
-            {
-                "topic": "POLICY-NOTIFICATION",
-                "servers": [
-                    "localhost"
-                ],
-                "topicCommInfrastructure": "dmaap"
             }
         ]
     }
index 0ea56eb..ddd04ed 100644 (file)
                     "localhost"
                 ],
                 "topicCommInfrastructure": "dmaap"
-            },
-            {
-                "topic": "POLICY-NOTIFICATION",
-                "servers": [
-                    "localhost"
-                ],
-                "topicCommInfrastructure": "dmaap"
             }
         ]
     }