2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
4 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.clamp.acm.participant.intermediary.handler;
24 import java.util.List;
26 import java.util.UUID;
27 import java.util.stream.Collectors;
28 import lombok.RequiredArgsConstructor;
29 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
30 import org.onap.policy.clamp.acm.participant.intermediary.api.ElementState;
31 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
32 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
33 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
34 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
35 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
36 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
37 import org.onap.policy.clamp.models.acm.concepts.DeployState;
38 import org.onap.policy.clamp.models.acm.concepts.LockState;
39 import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
40 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
41 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
42 import org.onap.policy.clamp.models.acm.concepts.SubState;
43 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
44 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
45 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionMigration;
46 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionStateChange;
47 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
48 import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
49 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
50 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import org.springframework.stereotype.Component;
56 * This class is responsible for managing the state of all automation compositions in the participant.
59 @RequiredArgsConstructor
60 public class AutomationCompositionHandler {
61 private static final Logger LOGGER = LoggerFactory.getLogger(AutomationCompositionHandler.class);
63 private final CacheProvider cacheProvider;
64 private final ParticipantMessagePublisher publisher;
65 private final ThreadHandler listener;
68 * Handle a automation composition state change message.
70 * @param stateChangeMsg the state change message
72 public void handleAutomationCompositionStateChange(AutomationCompositionStateChange stateChangeMsg) {
73 var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId());
75 if (automationComposition == null) {
76 if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) {
77 var automationCompositionAck = new AutomationCompositionDeployAck(
78 ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
79 automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
80 automationCompositionAck.setReplicaId(cacheProvider.getReplicaId());
81 automationCompositionAck.setMessage("Already deleted or never used");
82 automationCompositionAck.setResult(true);
83 automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR);
84 automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
85 automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
86 publisher.sendAutomationCompositionAck(automationCompositionAck);
88 LOGGER.debug("Automation composition {} does not use this participant",
89 stateChangeMsg.getAutomationCompositionId());
94 switch (stateChangeMsg.getDeployOrderedState()) {
95 case UNDEPLOY -> handleUndeployState(stateChangeMsg.getMessageId(), automationComposition,
96 stateChangeMsg.getStartPhase());
97 case DELETE -> handleDeleteState(stateChangeMsg.getMessageId(), automationComposition,
98 stateChangeMsg.getStartPhase());
100 LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey());
105 * Handle a automation composition properties update message.
107 * @param updateMsg the properties update message
109 public void handleAcPropertyUpdate(PropertiesUpdate updateMsg) {
111 if (updateMsg.getParticipantUpdatesList().isEmpty()) {
112 LOGGER.warn("No AutomationCompositionElement updates in message {}",
113 updateMsg.getAutomationCompositionId());
117 for (var participantDeploy : updateMsg.getParticipantUpdatesList()) {
118 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
119 var automationComposition =
120 cacheProvider.getAutomationComposition(updateMsg.getAutomationCompositionId());
121 automationComposition.setDeployState(DeployState.UPDATING);
122 var acCopy = new AutomationComposition(automationComposition);
123 updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy);
125 callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(), acCopy);
131 * Handle a automation composition Deploy message.
133 * @param deployMsg the Deploy message
135 public void handleAutomationCompositionDeploy(AutomationCompositionDeploy deployMsg) {
137 if (deployMsg.getParticipantUpdatesList().isEmpty()) {
138 LOGGER.warn("No AutomationCompositionElement deploy in message {}", deployMsg.getAutomationCompositionId());
142 for (var participantDeploy : deployMsg.getParticipantUpdatesList()) {
143 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
144 if (deployMsg.isFirstStartPhase()) {
145 cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(),
146 deployMsg.getAutomationCompositionId(), participantDeploy,
147 deployMsg.getRevisionIdInstance());
149 callParticipantDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
150 deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId());
155 private void callParticipantDeploy(UUID messageId, List<AcElementDeploy> acElementDeployList, Integer startPhaseMsg,
157 var automationComposition = cacheProvider.getAutomationComposition(instanceId);
158 automationComposition.setDeployState(DeployState.DEPLOYING);
159 for (var elementDeploy : acElementDeployList) {
160 var element = automationComposition.getElements().get(elementDeploy.getId());
161 var compositionInProperties = cacheProvider.getCommonProperties(automationComposition.getCompositionId(),
162 element.getDefinition());
163 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
164 if (startPhaseMsg.equals(startPhase)) {
165 var compositionElement =
166 cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
167 var instanceElement =
168 new InstanceElementDto(instanceId, elementDeploy.getId(), elementDeploy.getProperties(),
169 element.getOutProperties());
170 listener.deploy(messageId, compositionElement, instanceElement);
175 private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements,
176 AutomationComposition acCopy) {
177 var instanceElementDtoMap = cacheProvider.getInstanceElementDtoMap(acCopy);
178 var instanceElementDtoMapUpdated =
179 cacheProvider.getInstanceElementDtoMap(cacheProvider.getAutomationComposition(acCopy.getInstanceId()));
180 var compositionElementDtoMap = cacheProvider.getCompositionElementDtoMap(acCopy);
181 for (var acElement : acElements) {
182 listener.update(messageId, compositionElementDtoMap.get(acElement.getId()),
183 instanceElementDtoMap.get(acElement.getId()), instanceElementDtoMapUpdated.get(acElement.getId()));
187 private void migrateExistingElementsOnThisParticipant(UUID instanceId, UUID compositionTargetId,
188 ParticipantDeploy participantDeploy, int stage) {
189 var automationComposition = cacheProvider.getAutomationComposition(instanceId);
190 var acElementList = automationComposition.getElements();
191 for (var element : participantDeploy.getAcElementList()) {
192 var compositionInProperties =
193 cacheProvider.getCommonProperties(compositionTargetId, element.getDefinition());
194 var stageSet = ParticipantUtils.findStageSetMigrate(compositionInProperties);
195 if (stageSet.contains(stage)) {
196 var acElement = acElementList.get(element.getId());
197 if (acElement == null) {
198 var newElement = CacheProvider.createAutomationCompositionElement(element);
199 newElement.setParticipantId(participantDeploy.getParticipantId());
200 newElement.setDeployState(DeployState.MIGRATING);
201 newElement.setLockState(LockState.LOCKED);
202 newElement.setStage(stage);
204 acElementList.put(element.getId(), newElement);
205 LOGGER.info("New Ac Element with id {} is added in Migration", element.getId());
207 AcmUtils.recursiveMerge(acElement.getProperties(), element.getProperties());
208 acElement.setDeployState(DeployState.MIGRATING);
209 acElement.setStage(stage);
210 acElement.setDefinition(element.getDefinition());
214 // Check for missing elements and remove them from cache
215 var elementsToRemove = findElementsToRemove(participantDeploy.getAcElementList(), acElementList);
216 for (var key : elementsToRemove) {
217 acElementList.remove(key);
218 LOGGER.info("Element with id {} is removed in Migration", key);
222 private void updateExistingElementsOnThisParticipant(UUID instanceId, ParticipantDeploy participantDeploy) {
223 var acElementList = cacheProvider.getAutomationComposition(instanceId).getElements();
224 for (var element : participantDeploy.getAcElementList()) {
225 var acElement = acElementList.get(element.getId());
226 AcmUtils.recursiveMerge(acElement.getProperties(), element.getProperties());
227 acElement.setDeployState(DeployState.UPDATING);
228 acElement.setSubState(SubState.NONE);
229 acElement.setDefinition(element.getDefinition());
233 private List<UUID> findElementsToRemove(List<AcElementDeploy> acElementDeployList,
234 Map<UUID, AutomationCompositionElement> acElementList) {
235 var acElementDeploySet = acElementDeployList.stream().map(AcElementDeploy::getId).collect(Collectors.toSet());
236 return acElementList.keySet().stream().filter(id -> !acElementDeploySet.contains(id)).toList();
240 * Method to handle when the new state from participant is UNINITIALISED state.
242 * @param messageId the messageId
243 * @param automationComposition participant response
244 * @param startPhaseMsg startPhase from message
246 private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition,
247 Integer startPhaseMsg) {
248 automationComposition.setCompositionTargetId(null);
249 automationComposition.setDeployState(DeployState.UNDEPLOYING);
250 for (var element : automationComposition.getElements().values()) {
251 var compositionInProperties = cacheProvider.getCommonProperties(automationComposition.getCompositionId(),
252 element.getDefinition());
253 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
254 if (startPhaseMsg.equals(startPhase)) {
255 element.setDeployState(DeployState.UNDEPLOYING);
256 var compositionElement =
257 cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
258 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
259 element.getProperties(), element.getOutProperties());
260 listener.undeploy(messageId, compositionElement, instanceElement);
265 private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition,
266 Integer startPhaseMsg) {
267 automationComposition.setDeployState(DeployState.DELETING);
268 for (var element : automationComposition.getElements().values()) {
269 var compositionInProperties = cacheProvider.getCommonProperties(automationComposition.getCompositionId(),
270 element.getDefinition());
271 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
272 if (startPhaseMsg.equals(startPhase)) {
273 element.setDeployState(DeployState.DELETING);
274 element.setSubState(SubState.NONE);
275 var compositionElement =
276 cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
277 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
278 element.getProperties(), element.getOutProperties());
279 listener.delete(messageId, compositionElement, instanceElement);
285 * Handles AutomationComposition Migration.
287 * @param migrationMsg the AutomationCompositionMigration
289 public void handleAutomationCompositionMigration(AutomationCompositionMigration migrationMsg) {
290 var automationComposition = cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId());
291 if (automationComposition == null) {
292 LOGGER.debug("Automation composition {} does not use this participant",
293 migrationMsg.getAutomationCompositionId());
296 var acCopy = new AutomationComposition(automationComposition);
297 automationComposition.setCompositionTargetId(migrationMsg.getCompositionTargetId());
298 automationComposition.setDeployState(DeployState.MIGRATING);
299 for (var participantDeploy : migrationMsg.getParticipantUpdatesList()) {
300 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
302 migrateExistingElementsOnThisParticipant(migrationMsg.getAutomationCompositionId(),
303 migrationMsg.getCompositionTargetId(), participantDeploy, migrationMsg.getStage());
305 callParticipantMigrate(migrationMsg.getMessageId(), participantDeploy.getAcElementList(), acCopy,
306 migrationMsg.getCompositionTargetId(), migrationMsg.getStage(),
307 Boolean.TRUE.equals(migrationMsg.getRollback()));
312 private void callParticipantMigrate(UUID messageId, List<AcElementDeploy> acElements, AutomationComposition acCopy,
313 UUID compositionTargetId, int stage, boolean rollback) {
314 var compositionElementMap = cacheProvider.getCompositionElementDtoMap(acCopy);
315 var instanceElementMap = cacheProvider.getInstanceElementDtoMap(acCopy);
316 var automationComposition = cacheProvider.getAutomationComposition(acCopy.getInstanceId());
317 var compositionElementTargetMap =
318 cacheProvider.getCompositionElementDtoMap(automationComposition, compositionTargetId);
319 var instanceElementMigrateMap = cacheProvider.getInstanceElementDtoMap(automationComposition);
321 // Call migrate for newly added and updated elements
322 for (var acElement : acElements) {
323 var compositionInProperties =
324 cacheProvider.getCommonProperties(compositionTargetId, acElement.getDefinition());
325 var stageSet = ParticipantUtils.findStageSetMigrate(compositionInProperties);
326 if (stageSet.contains(stage)) {
327 if (instanceElementMap.get(acElement.getId()) == null) {
328 var compositionElementDto =
329 new CompositionElementDto(acCopy.getCompositionId(), acElement.getDefinition(), Map.of(),
330 Map.of(), ElementState.NOT_PRESENT);
331 var instanceElementDto =
332 new InstanceElementDto(acCopy.getInstanceId(), acElement.getId(), Map.of(), Map.of(),
333 ElementState.NOT_PRESENT);
334 var compositionElementTargetDto =
335 CacheProvider.changeStateToNew(compositionElementTargetMap.get(acElement.getId()));
336 var instanceElementMigrateDto =
337 CacheProvider.changeStateToNew(instanceElementMigrateMap.get(acElement.getId()));
339 listenerMigrate(messageId, compositionElementDto, compositionElementTargetDto, instanceElementDto,
340 instanceElementMigrateDto, stage, rollback);
342 listenerMigrate(messageId, compositionElementMap.get(acElement.getId()),
343 compositionElementTargetMap.get(acElement.getId()),
344 instanceElementMap.get(acElement.getId()), instanceElementMigrateMap.get(acElement.getId()),
350 // Call migrate for removed elements
351 List<UUID> removedElements = findElementsToRemove(acElements, acCopy.getElements());
352 for (var elementId : removedElements) {
353 var compositionDtoTarget = new CompositionElementDto(compositionTargetId,
354 acCopy.getElements().get(elementId).getDefinition(), Map.of(), Map.of(), ElementState.REMOVED);
355 var instanceDtoTarget = new InstanceElementDto(acCopy.getInstanceId(), elementId, Map.of(), Map.of(),
356 ElementState.REMOVED);
357 listenerMigrate(messageId, compositionElementMap.get(elementId), compositionDtoTarget,
358 instanceElementMap.get(elementId), instanceDtoTarget, 0, rollback);
363 private void listenerMigrate(UUID messageId, CompositionElementDto compositionElement,
364 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
365 InstanceElementDto instanceElementMigrate, int stage, boolean rollback) {
367 listener.rollback(messageId, compositionElement, compositionElementTarget, instanceElement,
368 instanceElementMigrate, stage);
370 listener.migrate(messageId, compositionElement, compositionElementTarget, instanceElement,
371 instanceElementMigrate, stage);