2  * ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
 
   4  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
 
   5  * ================================================================================
 
   6  * Licensed under the Apache License, Version 2.0 (the "License");
 
   7  * you may not use this file except in compliance with the License.
 
   8  * You may obtain a copy of the License at
 
  10  *      http://www.apache.org/licenses/LICENSE-2.0
 
  12  * Unless required by applicable law or agreed to in writing, software
 
  13  * distributed under the License is distributed on an "AS IS" BASIS,
 
  14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  15  * See the License for the specific language governing permissions and
 
  16  * limitations under the License.
 
  18  * SPDX-License-Identifier: Apache-2.0
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.policy.clamp.acm.participant.intermediary.handler;
 
  24 import java.util.HashMap;
 
  25 import java.util.List;
 
  28 import java.util.UUID;
 
  29 import lombok.RequiredArgsConstructor;
 
  30 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 
  31 import org.onap.policy.clamp.acm.participant.intermediary.api.ElementState;
 
  32 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 
  33 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
 
  34 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
 
  35 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 
  36 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 
  37 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 
  38 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 
  39 import org.onap.policy.clamp.models.acm.concepts.LockState;
 
  40 import org.onap.policy.clamp.models.acm.concepts.MigrationState;
 
  41 import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
 
  42 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
 
  43 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 
  44 import org.onap.policy.clamp.models.acm.concepts.SubState;
 
  45 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
 
  46 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
 
  47 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionMigration;
 
  48 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionStateChange;
 
  49 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
 
  50 import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
 
  51 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 
  52 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
 
  53 import org.slf4j.Logger;
 
  54 import org.slf4j.LoggerFactory;
 
  55 import org.springframework.stereotype.Component;
 
  58  * This class is responsible for managing the state of all automation compositions in the participant.
 
  61 @RequiredArgsConstructor
 
  62 public class AutomationCompositionHandler {
 
  63     private static final Logger LOGGER = LoggerFactory.getLogger(AutomationCompositionHandler.class);
 
  64     private static final String AC_NOT_USED = "Automation composition {} does not use this participant";
 
  66     private final CacheProvider cacheProvider;
 
  67     private final ParticipantMessagePublisher publisher;
 
  68     private final ThreadHandler listener;
 
  71      * Handle a automation composition state change message.
 
  73      * @param stateChangeMsg the state change message
 
  75     public void handleAutomationCompositionStateChange(AutomationCompositionStateChange stateChangeMsg) {
 
  76         var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId());
 
  78         if (automationComposition == null) {
 
  79             if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) {
 
  80                 var automationCompositionAck = new AutomationCompositionDeployAck(
 
  81                         ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
 
  82                 automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
 
  83                 automationCompositionAck.setReplicaId(cacheProvider.getReplicaId());
 
  84                 automationCompositionAck.setMessage("Already deleted or never used");
 
  85                 automationCompositionAck.setResult(true);
 
  86                 automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR);
 
  87                 automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
 
  88                 automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
 
  89                 publisher.sendAutomationCompositionAck(automationCompositionAck);
 
  91                 LOGGER.warn(AC_NOT_USED, stateChangeMsg.getAutomationCompositionId());
 
  96         switch (stateChangeMsg.getDeployOrderedState()) {
 
  97             case UNDEPLOY -> handleUndeployState(stateChangeMsg.getMessageId(), automationComposition,
 
  98                     stateChangeMsg.getStartPhase());
 
  99             case DELETE -> handleDeleteState(stateChangeMsg.getMessageId(), automationComposition,
 
 100                     stateChangeMsg.getStartPhase());
 
 102                     LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey());
 
 107      * Handle a automation composition properties update message.
 
 109      * @param updateMsg the properties update message
 
 111     public void handleAcPropertyUpdate(PropertiesUpdate updateMsg) {
 
 113         if (updateMsg.getParticipantUpdatesList().isEmpty()) {
 
 114             LOGGER.warn("No AutomationCompositionElement updates in message {}",
 
 115                     updateMsg.getAutomationCompositionId());
 
 119         for (var participantDeploy : updateMsg.getParticipantUpdatesList()) {
 
 120             if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
 
 121                 var automationComposition =
 
 122                         cacheProvider.getAutomationComposition(updateMsg.getAutomationCompositionId());
 
 123                 automationComposition.setDeployState(DeployState.UPDATING);
 
 124                 var acCopy = new AutomationComposition(automationComposition);
 
 125                 updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy);
 
 127                 callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(), acCopy);
 
 133      * Handle a automation composition Deploy message.
 
 135      * @param deployMsg the Deploy message
 
 137     public void handleAutomationCompositionDeploy(AutomationCompositionDeploy deployMsg) {
 
 139         if (deployMsg.getParticipantUpdatesList().isEmpty()) {
 
 140             LOGGER.warn("No AutomationCompositionElement deploy in message {}", deployMsg.getAutomationCompositionId());
 
 144         for (var participantDeploy : deployMsg.getParticipantUpdatesList()) {
 
 145             if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
 
 146                 if (deployMsg.isFirstStartPhase()) {
 
 147                     cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(),
 
 148                             deployMsg.getAutomationCompositionId(), participantDeploy,
 
 149                             deployMsg.getRevisionIdInstance());
 
 151                 callParticipantDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
 
 152                         deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId());
 
 157     private void callParticipantDeploy(UUID messageId, List<AcElementDeploy> acElementDeployList, Integer startPhaseMsg,
 
 159         var automationComposition = cacheProvider.getAutomationComposition(instanceId);
 
 160         automationComposition.setDeployState(DeployState.DEPLOYING);
 
 161         for (var elementDeploy : acElementDeployList) {
 
 162             var element = automationComposition.getElements().get(elementDeploy.getId());
 
 163             var compositionInProperties = cacheProvider.getCommonProperties(automationComposition.getCompositionId(),
 
 164                     element.getDefinition());
 
 165             int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
 
 166             if (startPhaseMsg.equals(startPhase)) {
 
 167                 var compositionElement =
 
 168                         cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
 
 169                 var instanceElement =
 
 170                         new InstanceElementDto(instanceId, elementDeploy.getId(), elementDeploy.getProperties(),
 
 171                                 element.getOutProperties());
 
 172                 listener.deploy(messageId, compositionElement, instanceElement);
 
 177     private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements,
 
 178             AutomationComposition acCopy) {
 
 179         var instanceElementDtoMap = cacheProvider.getInstanceElementDtoMap(acCopy);
 
 180         var instanceElementDtoMapUpdated =
 
 181                 cacheProvider.getInstanceElementDtoMap(cacheProvider.getAutomationComposition(acCopy.getInstanceId()));
 
 182         var compositionElementDtoMap = cacheProvider.getCompositionElementDtoMap(acCopy);
 
 183         for (var acElement : acElements) {
 
 184             listener.update(messageId, compositionElementDtoMap.get(acElement.getId()),
 
 185                     instanceElementDtoMap.get(acElement.getId()), instanceElementDtoMapUpdated.get(acElement.getId()));
 
 189     private void migrateExistingElementsOnThisParticipant(AutomationComposition automationComposition,
 
 190                                                           UUID compositionTargetId, ParticipantDeploy participantDeploy,
 
 191                                                           int stage, boolean newParticipant) {
 
 192         for (var element : participantDeploy.getAcElementList()) {
 
 193             UUID compIdForCommonProperties = null;
 
 194             if (MigrationState.REMOVED.equals(element.getMigrationState())) {
 
 195                 compIdForCommonProperties = automationComposition.getCompositionId();
 
 197                 compIdForCommonProperties = compositionTargetId;
 
 199             var compositionInProperties =
 
 200                     cacheProvider.getCommonProperties(compIdForCommonProperties, element.getDefinition());
 
 201             var stageSet = ParticipantUtils.findStageSetMigrate(compositionInProperties);
 
 202             if (MigrationState.REMOVED.equals(element.getMigrationState())) {
 
 203                 stageSet = Set.of(0);
 
 205             if (stageSet.contains(stage)) {
 
 206                 migrateElement(element, automationComposition, compositionTargetId, stage, newParticipant,
 
 212     private void migrateElement(AcElementDeploy element, AutomationComposition automationComposition,
 
 213                                 UUID compositionTargetId, int stage, boolean newParticipant,
 
 214                                 ParticipantDeploy participantDeploy) {
 
 215         var acElementList = automationComposition.getElements();
 
 216         automationComposition.setCompositionTargetId(compositionTargetId);
 
 217         automationComposition.setDeployState(DeployState.MIGRATING);
 
 218         var acElement = acElementList.get(element.getId());
 
 219         if (acElement == null) {  // NEW element with existing participant
 
 220             var newElement = CacheProvider.createAutomationCompositionElement(element);
 
 221             newElement.setParticipantId(participantDeploy.getParticipantId());
 
 222             newElement.setDeployState(DeployState.MIGRATING);
 
 223             newElement.setLockState(LockState.LOCKED);
 
 224             newElement.setStage(stage);
 
 225             newElement.setMigrationState(MigrationState.NEW);
 
 227             acElementList.put(element.getId(), newElement);
 
 228             LOGGER.info("New Ac Element with id {} is added in Migration", element.getId());
 
 230             acElement.setStage(stage);
 
 231             acElement.setMigrationState(element.getMigrationState());
 
 232             if (! newParticipant) { //DEFAULT element
 
 233                 AcmUtils.recursiveMerge(acElement.getProperties(), element.getProperties());
 
 234                 acElement.setDeployState(DeployState.MIGRATING);
 
 235                 acElement.setDefinition(element.getDefinition());
 
 237             LOGGER.info("Cache updated for the migration of element with id {}", element.getId());
 
 242     private void updateExistingElementsOnThisParticipant(UUID instanceId, ParticipantDeploy participantDeploy) {
 
 243         var acElementList = cacheProvider.getAutomationComposition(instanceId).getElements();
 
 244         for (var element : participantDeploy.getAcElementList()) {
 
 245             var acElement = acElementList.get(element.getId());
 
 246             AcmUtils.recursiveMerge(acElement.getProperties(), element.getProperties());
 
 247             acElement.setDeployState(DeployState.UPDATING);
 
 248             acElement.setSubState(SubState.NONE);
 
 249             acElement.setDefinition(element.getDefinition());
 
 254      * Method to handle when the new state from participant is UNINITIALISED state.
 
 256      * @param messageId             the messageId
 
 257      * @param automationComposition participant response
 
 258      * @param startPhaseMsg         startPhase from message
 
 260     private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition,
 
 261             Integer startPhaseMsg) {
 
 262         automationComposition.setDeployState(DeployState.UNDEPLOYING);
 
 263         for (var element : automationComposition.getElements().values()) {
 
 264             UUID compositionId = null;
 
 265             if (MigrationState.NEW.equals(element.getMigrationState())) {
 
 266                 compositionId = automationComposition.getCompositionTargetId();
 
 268                 compositionId = automationComposition.getCompositionId();
 
 270             var compositionInProperties = cacheProvider.getCommonProperties(compositionId, element.getDefinition());
 
 271             int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
 
 272             if (MigrationState.NEW.equals(element.getMigrationState())) {
 
 273                 // Undeploy newly added element on a Failed Migration
 
 276             if (startPhaseMsg.equals(startPhase)) {
 
 277                 element.setDeployState(DeployState.UNDEPLOYING);
 
 278                 var compositionElement =
 
 279                         cacheProvider.createCompositionElementDto(compositionId, element);
 
 280                 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
 
 281                         element.getProperties(), element.getOutProperties());
 
 282                 listener.undeploy(messageId, compositionElement, instanceElement);
 
 287     private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition,
 
 288             Integer startPhaseMsg) {
 
 289         automationComposition.setDeployState(DeployState.DELETING);
 
 290         for (var element : automationComposition.getElements().values()) {
 
 291             var compositionInProperties = cacheProvider.getCommonProperties(automationComposition.getCompositionId(),
 
 292                     element.getDefinition());
 
 293             int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
 
 294             if (startPhaseMsg.equals(startPhase)) {
 
 295                 element.setDeployState(DeployState.DELETING);
 
 296                 element.setSubState(SubState.NONE);
 
 297                 var compositionElement =
 
 298                         cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
 
 299                 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
 
 300                         element.getProperties(), element.getOutProperties());
 
 301                 listener.delete(messageId, compositionElement, instanceElement);
 
 307      * Handles AutomationComposition Migration.
 
 309      * @param migrationMsg the AutomationCompositionMigration
 
 311     public void handleAutomationCompositionMigration(AutomationCompositionMigration migrationMsg) {
 
 312         var automationComposition = cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId());
 
 313         var acTargetDefinition = cacheProvider.getAcElementsDefinitions().get(migrationMsg.getCompositionTargetId());
 
 314         if (Boolean.FALSE.equals(migrationMsg.getRollback())) {
 
 315             handleMigration(automationComposition, acTargetDefinition, migrationMsg);
 
 317             handleRollback(automationComposition, migrationMsg);
 
 321     private void handleRollback(AutomationComposition automationComposition,
 
 322                                 AutomationCompositionMigration migrationMsg) {
 
 323         AutomationComposition acCopy = null;
 
 324         if (automationComposition == null) {
 
 325             LOGGER.warn(AC_NOT_USED, migrationMsg.getAutomationCompositionId());
 
 328             LOGGER.info("Rollback operation invoked for the instance {}", migrationMsg.getAutomationCompositionId());
 
 329             acCopy = new AutomationComposition(automationComposition);
 
 330             automationComposition.setCompositionTargetId(migrationMsg.getCompositionTargetId());
 
 331             automationComposition.setDeployState(DeployState.MIGRATION_REVERTING);
 
 333         for (var participantDeploy : migrationMsg.getParticipantUpdatesList()) {
 
 334             if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
 
 335                 migrateExistingElementsOnThisParticipant(automationComposition, migrationMsg.getCompositionTargetId(),
 
 336                         participantDeploy, migrationMsg.getStage(), false);
 
 338                 callParticipantMigrate(migrationMsg, participantDeploy.getAcElementList(), acCopy);
 
 344     private void handleMigration(AutomationComposition automationComposition, AcDefinition acTargetDefinition,
 
 345                                  AutomationCompositionMigration migrationMsg) {
 
 346         AutomationComposition acCopy = null;
 
 347         if (automationComposition == null) {
 
 348             if (acTargetDefinition == null) {
 
 349                 LOGGER.warn(AC_NOT_USED, migrationMsg.getAutomationCompositionId());
 
 353             LOGGER.info("Migration invoked on an existing participant for the instance {}",
 
 354                     migrationMsg.getAutomationCompositionId());
 
 355             acCopy = new AutomationComposition(automationComposition);
 
 357         var newParticipant = false;
 
 358         for (var participantDeploy : migrationMsg.getParticipantUpdatesList()) {
 
 359             if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
 
 360                 if (automationComposition == null) {
 
 361                     // New element with new participant added in Migration
 
 362                     LOGGER.info("Participant newly added in Migration for the instance {}",
 
 363                             migrationMsg.getAutomationCompositionId());
 
 364                     newParticipant = true;
 
 365                     cacheProvider.initializeAutomationComposition(migrationMsg.getCompositionId(),
 
 366                             migrationMsg.getCompositionTargetId(), migrationMsg.getAutomationCompositionId(),
 
 367                             participantDeploy, DeployState.MIGRATING, SubState.NONE,
 
 368                             migrationMsg.getRevisionIdInstance());
 
 369                     automationComposition = cacheProvider
 
 370                             .getAutomationComposition(migrationMsg.getAutomationCompositionId());
 
 372                 migrateExistingElementsOnThisParticipant(automationComposition, migrationMsg.getCompositionTargetId(),
 
 373                         participantDeploy, migrationMsg.getStage(), newParticipant);
 
 375                 callParticipantMigrate(migrationMsg, participantDeploy.getAcElementList(), acCopy);
 
 380     private void callParticipantMigrate(AutomationCompositionMigration migrationMsg, List<AcElementDeploy> acElements,
 
 381                                         AutomationComposition formerAcInstance) {
 
 382         var latestAcFromCache = cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId());
 
 383         var instanceElementTargetMap = cacheProvider.getInstanceElementDtoMap(latestAcFromCache);
 
 384         var compositionElementTargetMap = cacheProvider.getCompositionElementDtoMap(latestAcFromCache,
 
 385                 migrationMsg.getCompositionTargetId());
 
 386         Map<UUID, CompositionElementDto> compositionElementMap = new HashMap<>();
 
 387         Map<UUID, InstanceElementDto> instanceElementMap = new HashMap<>();
 
 388         if (formerAcInstance != null) { //Existing participant
 
 389             compositionElementMap = cacheProvider.getCompositionElementDtoMap(formerAcInstance);
 
 390             instanceElementMap = cacheProvider.getInstanceElementDtoMap(formerAcInstance);
 
 392         // Call migrate for new and existing elements
 
 393         for (var acElement : acElements) {
 
 394             UUID compIdForCommonProperties = null;
 
 395             if (MigrationState.REMOVED.equals(acElement.getMigrationState())) {
 
 396                 compIdForCommonProperties = latestAcFromCache.getCompositionId();
 
 398                 compIdForCommonProperties = migrationMsg.getCompositionTargetId();
 
 400             var compositionInProperties =
 
 401                     cacheProvider.getCommonProperties(compIdForCommonProperties, acElement.getDefinition());
 
 402             var stageSet = ParticipantUtils.findStageSetMigrate(compositionInProperties);
 
 403             if (MigrationState.REMOVED.equals(acElement.getMigrationState())) {
 
 404                 stageSet = Set.of(0);
 
 406             var rollback = Boolean.TRUE.equals(migrationMsg.getRollback());
 
 407             if (stageSet.contains(migrationMsg.getStage())) {
 
 408                 if (MigrationState.NEW.equals(acElement.getMigrationState())) {
 
 409                     var compositionElementDto = new CompositionElementDto(migrationMsg.getCompositionId(),
 
 410                             acElement.getDefinition(), Map.of(), Map.of(), ElementState.NOT_PRESENT);
 
 411                     var instanceElementDto = new InstanceElementDto(migrationMsg.getAutomationCompositionId(),
 
 412                             acElement.getId(), Map.of(), Map.of(), ElementState.NOT_PRESENT);
 
 413                     var compositionElementTargetDto =
 
 414                             CacheProvider.changeStateToNew(compositionElementTargetMap.get(acElement.getId()));
 
 415                     var instanceElementTargetDto =
 
 416                             CacheProvider.changeStateToNew(instanceElementTargetMap.get(acElement.getId()));
 
 418                     listenerMigrate(migrationMsg.getMessageId(), compositionElementDto, compositionElementTargetDto,
 
 419                             instanceElementDto, instanceElementTargetDto, migrationMsg.getStage(), rollback);
 
 421                 } else if (MigrationState.REMOVED.equals(acElement.getMigrationState())) {
 
 422                     var compositionDtoTarget = new CompositionElementDto(migrationMsg.getCompositionTargetId(),
 
 423                             acElement.getDefinition(), Map.of(), Map.of(), ElementState.REMOVED);
 
 424                     var instanceElementDtoTarget = new InstanceElementDto(migrationMsg.getAutomationCompositionId(),
 
 425                             acElement.getId(), Map.of(), Map.of(), ElementState.REMOVED);
 
 426                     listenerMigrate(migrationMsg.getMessageId(), compositionElementMap.get(acElement.getId()),
 
 427                             compositionDtoTarget, instanceElementMap.get(acElement.getId()), instanceElementDtoTarget,
 
 428                             migrationMsg.getStage(), rollback);
 
 430                 } else { // DEFAULT case
 
 431                     listenerMigrate(migrationMsg.getMessageId(), compositionElementMap.get(acElement.getId()),
 
 432                             compositionElementTargetMap.get(acElement.getId()),
 
 433                             instanceElementMap.get(acElement.getId()), instanceElementTargetMap.get(acElement.getId()),
 
 434                             migrationMsg.getStage(), rollback);
 
 440     private void listenerMigrate(UUID messageId, CompositionElementDto compositionElement,
 
 441             CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
 
 442             InstanceElementDto instanceElementMigrate, int stage, boolean rollback) {
 
 444             listener.rollback(messageId, compositionElement, compositionElementTarget, instanceElement,
 
 445                     instanceElementMigrate, stage);
 
 447             LOGGER.info("Invoking migration of element on the participant for {}", instanceElement.elementId());
 
 448             listener.migrate(messageId, compositionElement, compositionElementTarget, instanceElement,
 
 449                     instanceElementMigrate, stage);