2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
4 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.clamp.acm.participant.intermediary.handler;
24 import java.util.HashMap;
25 import java.util.List;
28 import java.util.UUID;
29 import lombok.RequiredArgsConstructor;
30 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
31 import org.onap.policy.clamp.acm.participant.intermediary.api.ElementState;
32 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
33 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
34 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
35 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
36 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
37 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
38 import org.onap.policy.clamp.models.acm.concepts.DeployState;
39 import org.onap.policy.clamp.models.acm.concepts.LockState;
40 import org.onap.policy.clamp.models.acm.concepts.MigrationState;
41 import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
42 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
43 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
44 import org.onap.policy.clamp.models.acm.concepts.SubState;
45 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
46 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
47 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionMigration;
48 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionStateChange;
49 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
50 import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
51 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
52 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.springframework.stereotype.Component;
58 * This class is responsible for managing the state of all automation compositions in the participant.
61 @RequiredArgsConstructor
62 public class AutomationCompositionHandler {
63 private static final Logger LOGGER = LoggerFactory.getLogger(AutomationCompositionHandler.class);
64 private static final String AC_NOT_USED = "Automation composition {} does not use this participant";
66 private final CacheProvider cacheProvider;
67 private final ParticipantMessagePublisher publisher;
68 private final ThreadHandler listener;
71 * Handle a automation composition state change message.
73 * @param stateChangeMsg the state change message
75 public void handleAutomationCompositionStateChange(AutomationCompositionStateChange stateChangeMsg) {
76 var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId());
78 if (automationComposition == null) {
79 if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) {
80 var automationCompositionAck = new AutomationCompositionDeployAck(
81 ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
82 automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
83 automationCompositionAck.setReplicaId(cacheProvider.getReplicaId());
84 automationCompositionAck.setMessage("Already deleted or never used");
85 automationCompositionAck.setResult(true);
86 automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR);
87 automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
88 automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
89 publisher.sendAutomationCompositionAck(automationCompositionAck);
91 LOGGER.warn(AC_NOT_USED, stateChangeMsg.getAutomationCompositionId());
96 switch (stateChangeMsg.getDeployOrderedState()) {
97 case UNDEPLOY -> handleUndeployState(stateChangeMsg.getMessageId(), automationComposition,
98 stateChangeMsg.getStartPhase());
99 case DELETE -> handleDeleteState(stateChangeMsg.getMessageId(), automationComposition,
100 stateChangeMsg.getStartPhase());
102 LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey());
107 * Handle a automation composition properties update message.
109 * @param updateMsg the properties update message
111 public void handleAcPropertyUpdate(PropertiesUpdate updateMsg) {
113 if (updateMsg.getParticipantUpdatesList().isEmpty()) {
114 LOGGER.warn("No AutomationCompositionElement updates in message {}",
115 updateMsg.getAutomationCompositionId());
119 for (var participantDeploy : updateMsg.getParticipantUpdatesList()) {
120 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
121 var automationComposition =
122 cacheProvider.getAutomationComposition(updateMsg.getAutomationCompositionId());
123 automationComposition.setDeployState(DeployState.UPDATING);
124 var acCopy = new AutomationComposition(automationComposition);
125 updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy);
127 callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(), acCopy);
133 * Handle a automation composition Deploy message.
135 * @param deployMsg the Deploy message
137 public void handleAutomationCompositionDeploy(AutomationCompositionDeploy deployMsg) {
139 if (deployMsg.getParticipantUpdatesList().isEmpty()) {
140 LOGGER.warn("No AutomationCompositionElement deploy in message {}", deployMsg.getAutomationCompositionId());
144 for (var participantDeploy : deployMsg.getParticipantUpdatesList()) {
145 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
146 if (deployMsg.isFirstStartPhase()) {
147 cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(),
148 deployMsg.getAutomationCompositionId(), participantDeploy,
149 deployMsg.getRevisionIdInstance());
151 callParticipantDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
152 deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId());
157 private void callParticipantDeploy(UUID messageId, List<AcElementDeploy> acElementDeployList, Integer startPhaseMsg,
159 var automationComposition = cacheProvider.getAutomationComposition(instanceId);
160 automationComposition.setDeployState(DeployState.DEPLOYING);
161 for (var elementDeploy : acElementDeployList) {
162 var element = automationComposition.getElements().get(elementDeploy.getId());
163 var compositionInProperties = cacheProvider.getCommonProperties(automationComposition.getCompositionId(),
164 element.getDefinition());
165 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
166 if (startPhaseMsg.equals(startPhase)) {
167 var compositionElement =
168 cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
169 var instanceElement =
170 new InstanceElementDto(instanceId, elementDeploy.getId(), elementDeploy.getProperties(),
171 element.getOutProperties());
172 listener.deploy(messageId, compositionElement, instanceElement);
177 private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements,
178 AutomationComposition acCopy) {
179 var instanceElementDtoMap = cacheProvider.getInstanceElementDtoMap(acCopy);
180 var instanceElementDtoMapUpdated =
181 cacheProvider.getInstanceElementDtoMap(cacheProvider.getAutomationComposition(acCopy.getInstanceId()));
182 var compositionElementDtoMap = cacheProvider.getCompositionElementDtoMap(acCopy);
183 for (var acElement : acElements) {
184 listener.update(messageId, compositionElementDtoMap.get(acElement.getId()),
185 instanceElementDtoMap.get(acElement.getId()), instanceElementDtoMapUpdated.get(acElement.getId()));
189 private void migrateExistingElementsOnThisParticipant(AutomationComposition automationComposition,
190 UUID compositionTargetId, ParticipantDeploy participantDeploy,
191 int stage, boolean newParticipant) {
192 for (var element : participantDeploy.getAcElementList()) {
193 UUID compIdForCommonProperties = null;
194 if (MigrationState.REMOVED.equals(element.getMigrationState())) {
195 compIdForCommonProperties = automationComposition.getCompositionId();
197 compIdForCommonProperties = compositionTargetId;
199 var compositionInProperties =
200 cacheProvider.getCommonProperties(compIdForCommonProperties, element.getDefinition());
201 var stageSet = ParticipantUtils.findStageSetMigrate(compositionInProperties);
202 if (MigrationState.REMOVED.equals(element.getMigrationState())) {
203 stageSet = Set.of(0);
205 if (stageSet.contains(stage)) {
206 migrateElement(element, automationComposition, compositionTargetId, stage, newParticipant,
212 private void migrateElement(AcElementDeploy element, AutomationComposition automationComposition,
213 UUID compositionTargetId, int stage, boolean newParticipant,
214 ParticipantDeploy participantDeploy) {
215 var acElementList = automationComposition.getElements();
216 automationComposition.setCompositionTargetId(compositionTargetId);
217 automationComposition.setDeployState(DeployState.MIGRATING);
218 var acElement = acElementList.get(element.getId());
219 if (acElement == null) { // NEW element with existing participant
220 var newElement = CacheProvider.createAutomationCompositionElement(element);
221 newElement.setParticipantId(participantDeploy.getParticipantId());
222 newElement.setDeployState(DeployState.MIGRATING);
223 newElement.setLockState(LockState.LOCKED);
224 newElement.setStage(stage);
225 newElement.setMigrationState(MigrationState.NEW);
227 acElementList.put(element.getId(), newElement);
228 LOGGER.info("New Ac Element with id {} is added in Migration", element.getId());
230 acElement.setStage(stage);
231 acElement.setMigrationState(element.getMigrationState());
232 if (! newParticipant) { //DEFAULT element
233 AcmUtils.recursiveMerge(acElement.getProperties(), element.getProperties());
234 acElement.setDeployState(DeployState.MIGRATING);
235 acElement.setDefinition(element.getDefinition());
237 LOGGER.info("Cache updated for the migration of element with id {}", element.getId());
242 private void updateExistingElementsOnThisParticipant(UUID instanceId, ParticipantDeploy participantDeploy) {
243 var acElementList = cacheProvider.getAutomationComposition(instanceId).getElements();
244 for (var element : participantDeploy.getAcElementList()) {
245 var acElement = acElementList.get(element.getId());
246 AcmUtils.recursiveMerge(acElement.getProperties(), element.getProperties());
247 acElement.setDeployState(DeployState.UPDATING);
248 acElement.setSubState(SubState.NONE);
249 acElement.setDefinition(element.getDefinition());
254 * Method to handle when the new state from participant is UNINITIALISED state.
256 * @param messageId the messageId
257 * @param automationComposition participant response
258 * @param startPhaseMsg startPhase from message
260 private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition,
261 Integer startPhaseMsg) {
262 automationComposition.setDeployState(DeployState.UNDEPLOYING);
263 for (var element : automationComposition.getElements().values()) {
264 UUID compositionId = null;
265 if (MigrationState.NEW.equals(element.getMigrationState())) {
266 compositionId = automationComposition.getCompositionTargetId();
268 compositionId = automationComposition.getCompositionId();
270 var compositionInProperties = cacheProvider.getCommonProperties(compositionId, element.getDefinition());
271 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
272 if (MigrationState.NEW.equals(element.getMigrationState())) {
273 // Undeploy newly added element on a Failed Migration
276 if (startPhaseMsg.equals(startPhase)) {
277 element.setDeployState(DeployState.UNDEPLOYING);
278 var compositionElement =
279 cacheProvider.createCompositionElementDto(compositionId, element);
280 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
281 element.getProperties(), element.getOutProperties());
282 listener.undeploy(messageId, compositionElement, instanceElement);
287 private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition,
288 Integer startPhaseMsg) {
289 automationComposition.setDeployState(DeployState.DELETING);
290 for (var element : automationComposition.getElements().values()) {
291 var compositionInProperties = cacheProvider.getCommonProperties(automationComposition.getCompositionId(),
292 element.getDefinition());
293 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
294 if (startPhaseMsg.equals(startPhase)) {
295 element.setDeployState(DeployState.DELETING);
296 element.setSubState(SubState.NONE);
297 var compositionElement =
298 cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
299 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
300 element.getProperties(), element.getOutProperties());
301 listener.delete(messageId, compositionElement, instanceElement);
307 * Handles AutomationComposition Migration.
309 * @param migrationMsg the AutomationCompositionMigration
311 public void handleAutomationCompositionMigration(AutomationCompositionMigration migrationMsg) {
312 var automationComposition = cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId());
313 var acTargetDefinition = cacheProvider.getAcElementsDefinitions().get(migrationMsg.getCompositionTargetId());
314 if (Boolean.FALSE.equals(migrationMsg.getRollback())) {
315 handleMigration(automationComposition, acTargetDefinition, migrationMsg);
317 handleRollback(automationComposition, migrationMsg);
321 private void handleRollback(AutomationComposition automationComposition,
322 AutomationCompositionMigration migrationMsg) {
323 AutomationComposition acCopy = null;
324 if (automationComposition == null) {
325 LOGGER.warn(AC_NOT_USED, migrationMsg.getAutomationCompositionId());
328 LOGGER.info("Rollback operation invoked for the instance {}", migrationMsg.getAutomationCompositionId());
329 acCopy = new AutomationComposition(automationComposition);
330 automationComposition.setCompositionTargetId(migrationMsg.getCompositionTargetId());
331 automationComposition.setDeployState(DeployState.MIGRATION_REVERTING);
333 for (var participantDeploy : migrationMsg.getParticipantUpdatesList()) {
334 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
335 migrateExistingElementsOnThisParticipant(automationComposition, migrationMsg.getCompositionTargetId(),
336 participantDeploy, migrationMsg.getStage(), false);
338 callParticipantMigrate(migrationMsg, participantDeploy.getAcElementList(), acCopy);
344 private void handleMigration(AutomationComposition automationComposition, AcDefinition acTargetDefinition,
345 AutomationCompositionMigration migrationMsg) {
346 AutomationComposition acCopy = null;
347 if (automationComposition == null) {
348 if (acTargetDefinition == null) {
349 LOGGER.warn(AC_NOT_USED, migrationMsg.getAutomationCompositionId());
353 LOGGER.info("Migration invoked on an existing participant for the instance {}",
354 migrationMsg.getAutomationCompositionId());
355 acCopy = new AutomationComposition(automationComposition);
357 var newParticipant = false;
358 for (var participantDeploy : migrationMsg.getParticipantUpdatesList()) {
359 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
360 if (automationComposition == null) {
361 // New element with new participant added in Migration
362 LOGGER.info("Participant newly added in Migration for the instance {}",
363 migrationMsg.getAutomationCompositionId());
364 newParticipant = true;
365 cacheProvider.initializeAutomationComposition(migrationMsg.getCompositionId(),
366 migrationMsg.getCompositionTargetId(), migrationMsg.getAutomationCompositionId(),
367 participantDeploy, DeployState.MIGRATING, SubState.NONE,
368 migrationMsg.getRevisionIdInstance());
369 automationComposition = cacheProvider
370 .getAutomationComposition(migrationMsg.getAutomationCompositionId());
372 migrateExistingElementsOnThisParticipant(automationComposition, migrationMsg.getCompositionTargetId(),
373 participantDeploy, migrationMsg.getStage(), newParticipant);
375 callParticipantMigrate(migrationMsg, participantDeploy.getAcElementList(), acCopy);
380 private void callParticipantMigrate(AutomationCompositionMigration migrationMsg, List<AcElementDeploy> acElements,
381 AutomationComposition formerAcInstance) {
382 var latestAcFromCache = cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId());
383 var instanceElementTargetMap = cacheProvider.getInstanceElementDtoMap(latestAcFromCache);
384 var compositionElementTargetMap = cacheProvider.getCompositionElementDtoMap(latestAcFromCache,
385 migrationMsg.getCompositionTargetId());
386 Map<UUID, CompositionElementDto> compositionElementMap = new HashMap<>();
387 Map<UUID, InstanceElementDto> instanceElementMap = new HashMap<>();
388 if (formerAcInstance != null) { //Existing participant
389 compositionElementMap = cacheProvider.getCompositionElementDtoMap(formerAcInstance);
390 instanceElementMap = cacheProvider.getInstanceElementDtoMap(formerAcInstance);
392 // Call migrate for new and existing elements
393 for (var acElement : acElements) {
394 UUID compIdForCommonProperties = null;
395 if (MigrationState.REMOVED.equals(acElement.getMigrationState())) {
396 compIdForCommonProperties = latestAcFromCache.getCompositionId();
398 compIdForCommonProperties = migrationMsg.getCompositionTargetId();
400 var compositionInProperties =
401 cacheProvider.getCommonProperties(compIdForCommonProperties, acElement.getDefinition());
402 var stageSet = ParticipantUtils.findStageSetMigrate(compositionInProperties);
403 if (MigrationState.REMOVED.equals(acElement.getMigrationState())) {
404 stageSet = Set.of(0);
406 var rollback = Boolean.TRUE.equals(migrationMsg.getRollback());
407 if (stageSet.contains(migrationMsg.getStage())) {
408 if (MigrationState.NEW.equals(acElement.getMigrationState())) {
409 var compositionElementDto = new CompositionElementDto(migrationMsg.getCompositionId(),
410 acElement.getDefinition(), Map.of(), Map.of(), ElementState.NOT_PRESENT);
411 var instanceElementDto = new InstanceElementDto(migrationMsg.getAutomationCompositionId(),
412 acElement.getId(), Map.of(), Map.of(), ElementState.NOT_PRESENT);
413 var compositionElementTargetDto =
414 CacheProvider.changeStateToNew(compositionElementTargetMap.get(acElement.getId()));
415 var instanceElementTargetDto =
416 CacheProvider.changeStateToNew(instanceElementTargetMap.get(acElement.getId()));
418 listenerMigrate(migrationMsg.getMessageId(), compositionElementDto, compositionElementTargetDto,
419 instanceElementDto, instanceElementTargetDto, migrationMsg.getStage(), rollback);
421 } else if (MigrationState.REMOVED.equals(acElement.getMigrationState())) {
422 var compositionDtoTarget = new CompositionElementDto(migrationMsg.getCompositionTargetId(),
423 acElement.getDefinition(), Map.of(), Map.of(), ElementState.REMOVED);
424 var instanceElementDtoTarget = new InstanceElementDto(migrationMsg.getAutomationCompositionId(),
425 acElement.getId(), Map.of(), Map.of(), ElementState.REMOVED);
426 listenerMigrate(migrationMsg.getMessageId(), compositionElementMap.get(acElement.getId()),
427 compositionDtoTarget, instanceElementMap.get(acElement.getId()), instanceElementDtoTarget,
428 migrationMsg.getStage(), rollback);
430 } else { // DEFAULT case
431 listenerMigrate(migrationMsg.getMessageId(), compositionElementMap.get(acElement.getId()),
432 compositionElementTargetMap.get(acElement.getId()),
433 instanceElementMap.get(acElement.getId()), instanceElementTargetMap.get(acElement.getId()),
434 migrationMsg.getStage(), rollback);
440 private void listenerMigrate(UUID messageId, CompositionElementDto compositionElement,
441 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
442 InstanceElementDto instanceElementMigrate, int stage, boolean rollback) {
444 listener.rollback(messageId, compositionElement, compositionElementTarget, instanceElement,
445 instanceElementMigrate, stage);
447 LOGGER.info("Invoking migration of element on the participant for {}", instanceElement.elementId());
448 listener.migrate(messageId, compositionElement, compositionElementTarget, instanceElement,
449 instanceElementMigrate, stage);