2  * ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
 
   4  * ================================================================================
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *      http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  17  * SPDX-License-Identifier: Apache-2.0
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.clamp.acm.runtime.supervision;
 
  23 import io.micrometer.core.annotation.Timed;
 
  24 import java.util.HashMap;
 
  25 import java.util.List;
 
  27 import java.util.UUID;
 
  28 import java.util.function.UnaryOperator;
 
  29 import java.util.stream.Collectors;
 
  30 import lombok.AllArgsConstructor;
 
  31 import org.apache.commons.collections4.MapUtils;
 
  32 import org.onap.policy.clamp.acm.runtime.main.utils.EncryptionUtils;
 
  33 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
 
  34 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
 
  35 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 
  36 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 
  37 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 
  38 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 
  39 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
 
  40 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 
  41 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 
  42 import org.onap.policy.clamp.models.acm.concepts.Participant;
 
  43 import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
 
  44 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 
  45 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
 
  46 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
 
  47 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
 
  48 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
 
  49 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
 
  50 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
 
  51 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 
  52 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
 
  53 import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 
  54 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 
  55 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 
  56 import org.slf4j.Logger;
 
  57 import org.slf4j.LoggerFactory;
 
  58 import org.springframework.stereotype.Component;
 
  61  * This class handles supervision of participant status.
 
  65 public class SupervisionParticipantHandler {
 
  66     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantHandler.class);
 
  68     private final ParticipantProvider participantProvider;
 
  69     private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
 
  70     private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
 
  71     private final AutomationCompositionProvider automationCompositionProvider;
 
  72     private final AcDefinitionProvider acDefinitionProvider;
 
  73     private final ParticipantSyncPublisher participantSyncPublisher;
 
  74     private final MessageProvider messageProvider;
 
  75     private final EncryptionUtils encryptionUtils;
 
  78      * Handle a ParticipantRegister message from a participant.
 
  80      * @param participantRegisterMsg the ParticipantRegister message received from a participant
 
  82     @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
 
  83     public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
 
  84         saveIfNotPresent(participantRegisterMsg.getReplicaId(), participantRegisterMsg.getParticipantId(),
 
  85                 participantRegisterMsg.getParticipantSupportedElementType(), true);
 
  87         participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
 
  88                 participantRegisterMsg.getParticipantId(), participantRegisterMsg.getReplicaId());
 
  92      * Handle a ParticipantDeregister message from a participant.
 
  94      * @param participantDeregisterMsg the ParticipantDeregister message received from a participant
 
  96     @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
 
  97     public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
 
  98         var replicaId = participantDeregisterMsg.getReplicaId() != null
 
  99                 ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
 
 100         var replicaOpt = participantProvider.findParticipantReplica(replicaId);
 
 101         if (replicaOpt.isPresent()) {
 
 102             participantProvider.deleteParticipantReplica(replicaId);
 
 105         participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
 
 109      * Handle a ParticipantStatus message from a participant.
 
 111      * @param participantStatusMsg the ParticipantStatus message received from a participant
 
 114     @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
 
 115     public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
 
 116         saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
 
 117                 participantStatusMsg.getParticipantSupportedElementType(), false);
 
 119         if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
 
 120             messageProvider.saveInstanceOutProperties(participantStatusMsg);
 
 122         if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
 
 123                 && participantStatusMsg.getCompositionId() != null) {
 
 124             var acDefinition = acDefinitionProvider.findAcDefinition(participantStatusMsg.getCompositionId());
 
 125             if (acDefinition.isPresent()) {
 
 126                 var map = acDefinition.get().getElementStateMap().values().stream()
 
 127                         .collect(Collectors.toMap(NodeTemplateState::getNodeTemplateId, UnaryOperator.identity()));
 
 128                 messageProvider.saveCompositionOutProperties(participantStatusMsg, map);
 
 130                 LOGGER.error("Not valid ParticipantStatus message");
 
 135     private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
 
 136             List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
 
 137         var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
 
 138         var replicaOpt = participantProvider.findParticipantReplica(replicaId);
 
 139         if (replicaOpt.isPresent()) {
 
 140             var replica = replicaOpt.get();
 
 141             checkOnline(replica);
 
 143             var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
 
 144             participant.getReplicas().put(replicaId, createReplica(replicaId));
 
 145             participantProvider.saveParticipant(participant);
 
 148             handleRestart(participantId, replicaId);
 
 152     private Participant getParticipant(UUID participantId,
 
 153             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
 
 154         var participantOpt = participantProvider.findParticipant(participantId);
 
 155         return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
 
 158     private ParticipantReplica createReplica(UUID replicaId) {
 
 159         var replica = new ParticipantReplica();
 
 160         replica.setReplicaId(replicaId);
 
 161         replica.setParticipantState(ParticipantState.ON_LINE);
 
 162         replica.setLastMsg(TimestampHelper.now());
 
 167     private void checkOnline(ParticipantReplica replica) {
 
 168         if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
 
 169             replica.setParticipantState(ParticipantState.ON_LINE);
 
 171         replica.setLastMsg(TimestampHelper.now());
 
 172         participantProvider.saveParticipantReplica(replica);
 
 176      * Handle restart of a participant.
 
 178      * @param participantId     ID of the participant to restart
 
 179      * @param replicaId         ID of the participant replica
 
 181     public void handleRestart(UUID participantId, UUID replicaId) {
 
 182         var compositionIds = participantProvider.getCompositionIds(participantId);
 
 183         for (var compositionId : compositionIds) {
 
 184             var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
 
 185             LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
 
 186             handleSyncRestart(participantId, replicaId, acDefinition);
 
 190     private void handleSyncRestart(final UUID participantId, UUID replicaId,
 
 191             AutomationCompositionDefinition acDefinition) {
 
 192         if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
 
 193             LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
 
 196         LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
 
 197         var automationCompositionList =
 
 198                 automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
 
 199         encryptionUtils.decryptInstanceProperties(automationCompositionList);
 
 200         var automationCompositions =
 
 201                 automationCompositionList.stream().filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
 
 202         participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
 
 206      * Handle restart of all participants.
 
 208     public void handleRestartOfAllParticipants() {
 
 209         var participants = participantProvider.getParticipants();
 
 210         for (var participant:participants) {
 
 211             handleRestart(participant.getParticipantId(), null);
 
 215     private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
 
 216         for (var element : automationComposition.getElements().values()) {
 
 217             if (participantId.equals(element.getParticipantId())) {
 
 224     private Participant createParticipant(UUID participantId,
 
 225             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
 
 226         var participant = new Participant();
 
 227         participant.setParticipantId(participantId);
 
 228         participant.setParticipantSupportedElementTypes(participantSupportedElementType);
 
 232     private Map<UUID, ParticipantSupportedElementType> listToMap(List<ParticipantSupportedElementType> elementList) {
 
 233         Map<UUID, ParticipantSupportedElementType> map = new HashMap<>();
 
 234         MapUtils.populateMap(map, elementList, ParticipantSupportedElementType::getId);
 
 239      * Handle a participantReqSync message from a participant.
 
 241      * @param participantReqSync the message received from a participant
 
 243     @Timed(value = "listener.participant_req_sync", description = "PARTICIPANT_REQ_SYNC_MSG messages received")
 
 244     public void handleParticipantReqSync(ParticipantReqSync participantReqSync) {
 
 245         if (participantReqSync.getCompositionTargetId() != null) {
 
 246             // outdated Composition Target
 
 247             var acDefinition = acDefinitionProvider.getAcDefinition(participantReqSync.getCompositionTargetId());
 
 248             participantSyncPublisher.sendRestartMsg(participantReqSync.getParticipantId(),
 
 249                     participantReqSync.getReplicaId(), acDefinition, List.of());
 
 251         if (participantReqSync.getCompositionId() == null
 
 252                 && participantReqSync.getAutomationCompositionId() != null) {
 
 253             // outdated AutomationComposition
 
 254             var automationComposition =
 
 255                     getAutomationCompositionForSync(participantReqSync.getAutomationCompositionId());
 
 256             participantSyncPublisher.sendSync(automationComposition);
 
 258         if (participantReqSync.getCompositionId() != null) {
 
 259             // outdated Composition
 
 260             var acDefinition = acDefinitionProvider.getAcDefinition(participantReqSync.getCompositionId());
 
 261             var automationCompositions = participantReqSync.getAutomationCompositionId() != null
 
 262                     ? List.of(getAutomationCompositionForSync(participantReqSync.getAutomationCompositionId())) :
 
 263                     List.<AutomationComposition>of();
 
 264             participantSyncPublisher.sendRestartMsg(participantReqSync.getParticipantId(),
 
 265                     participantReqSync.getReplicaId(), acDefinition, automationCompositions);
 
 269     private AutomationComposition getAutomationCompositionForSync(UUID automationCompositionId) {
 
 270         var automationComposition = automationCompositionProvider.getAutomationComposition(automationCompositionId);
 
 271         encryptionUtils.decryptInstanceProperties(automationComposition);
 
 272         if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
 
 273             var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionTargetId());
 
 274             var stage = ParticipantUtils.getFirstStage(automationComposition, acDefinition.getServiceTemplate());
 
 275             if (automationComposition.getPhase().equals(stage)) {
 
 276                 // scenario first stage migration
 
 277                 var rollback = automationCompositionProvider.getAutomationCompositionRollback(automationCompositionId);
 
 278                 automationComposition.setElements(rollback.getElements().values().stream()
 
 279                     .collect(Collectors.toMap(AutomationCompositionElement::getId, AutomationCompositionElement::new)));
 
 282         return automationComposition;