2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
4 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.clamp.acm.participant.intermediary.handler;
24 import java.util.List;
26 import java.util.UUID;
27 import java.util.stream.Collectors;
28 import lombok.RequiredArgsConstructor;
29 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
30 import org.onap.policy.clamp.acm.participant.intermediary.api.ElementState;
31 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
32 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
33 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
34 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
35 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
36 import org.onap.policy.clamp.models.acm.concepts.DeployState;
37 import org.onap.policy.clamp.models.acm.concepts.LockState;
38 import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
39 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
40 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
41 import org.onap.policy.clamp.models.acm.concepts.SubState;
42 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
43 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
44 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionMigration;
45 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionStateChange;
46 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
47 import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
48 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
49 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.springframework.stereotype.Component;
55 * This class is responsible for managing the state of all automation compositions in the participant.
58 @RequiredArgsConstructor
59 public class AutomationCompositionHandler {
60 private static final Logger LOGGER = LoggerFactory.getLogger(AutomationCompositionHandler.class);
62 private final CacheProvider cacheProvider;
63 private final ParticipantMessagePublisher publisher;
64 private final ThreadHandler listener;
67 * Handle a automation composition state change message.
69 * @param stateChangeMsg the state change message
71 public void handleAutomationCompositionStateChange(AutomationCompositionStateChange stateChangeMsg) {
72 if (stateChangeMsg.getAutomationCompositionId() == null) {
76 var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId());
78 if (automationComposition == null) {
79 if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) {
80 var automationCompositionAck = new AutomationCompositionDeployAck(
81 ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
82 automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
83 automationCompositionAck.setReplicaId(cacheProvider.getReplicaId());
84 automationCompositionAck.setMessage("Already deleted or never used");
85 automationCompositionAck.setResult(true);
86 automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR);
87 automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
88 automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
89 publisher.sendAutomationCompositionAck(automationCompositionAck);
91 LOGGER.debug("Automation composition {} does not use this participant",
92 stateChangeMsg.getAutomationCompositionId());
97 switch (stateChangeMsg.getDeployOrderedState()) {
98 case UNDEPLOY -> handleUndeployState(stateChangeMsg.getMessageId(), automationComposition,
99 stateChangeMsg.getStartPhase());
100 case DELETE -> handleDeleteState(stateChangeMsg.getMessageId(), automationComposition,
101 stateChangeMsg.getStartPhase());
103 LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey());
108 * Handle a automation composition properties update message.
110 * @param updateMsg the properties update message
112 public void handleAcPropertyUpdate(PropertiesUpdate updateMsg) {
114 if (updateMsg.getParticipantUpdatesList().isEmpty()) {
115 LOGGER.warn("No AutomationCompositionElement updates in message {}",
116 updateMsg.getAutomationCompositionId());
120 for (var participantDeploy : updateMsg.getParticipantUpdatesList()) {
121 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
122 var automationComposition = cacheProvider.getAutomationComposition(
123 updateMsg.getAutomationCompositionId());
124 automationComposition.setDeployState(DeployState.UPDATING);
125 var acCopy = new AutomationComposition(automationComposition);
126 updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy);
128 callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(), acCopy);
134 * Handle a automation composition Deploy message.
136 * @param deployMsg the Deploy message
138 public void handleAutomationCompositionDeploy(AutomationCompositionDeploy deployMsg) {
140 if (deployMsg.getParticipantUpdatesList().isEmpty()) {
141 LOGGER.warn("No AutomationCompositionElement deploy in message {}", deployMsg.getAutomationCompositionId());
145 for (var participantDeploy : deployMsg.getParticipantUpdatesList()) {
146 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
147 if (deployMsg.isFirstStartPhase()) {
148 cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(),
149 deployMsg.getAutomationCompositionId(), participantDeploy);
151 callParticipanDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
152 deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId());
157 private void callParticipanDeploy(UUID messageId, List<AcElementDeploy> acElementDeployList,
158 Integer startPhaseMsg, UUID instanceId) {
159 var automationComposition = cacheProvider.getAutomationComposition(instanceId);
160 automationComposition.setDeployState(DeployState.DEPLOYING);
161 for (var elementDeploy : acElementDeployList) {
162 var element = automationComposition.getElements().get(elementDeploy.getId());
163 var compositionInProperties = cacheProvider
164 .getCommonProperties(automationComposition.getCompositionId(), element.getDefinition());
165 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
166 if (startPhaseMsg.equals(startPhase)) {
167 var compositionElement = cacheProvider.createCompositionElementDto(
168 automationComposition.getCompositionId(), element, compositionInProperties);
169 var instanceElement = new InstanceElementDto(instanceId, elementDeploy.getId(),
170 elementDeploy.getProperties(), element.getOutProperties());
171 listener.deploy(messageId, compositionElement, instanceElement);
176 private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements,
177 AutomationComposition acCopy) {
178 var instanceElementDtoMap = cacheProvider.getInstanceElementDtoMap(acCopy);
179 var instanceElementDtoMapUpdated = cacheProvider.getInstanceElementDtoMap(
180 cacheProvider.getAutomationComposition(acCopy.getInstanceId()));
181 var compositionElementDtoMap = cacheProvider.getCompositionElementDtoMap(acCopy);
182 for (var acElement : acElements) {
183 listener.update(messageId, compositionElementDtoMap.get(acElement.getId()),
184 instanceElementDtoMap.get(acElement.getId()), instanceElementDtoMapUpdated.get(acElement.getId()));
188 private void migrateExistingElementsOnThisParticipant(UUID instanceId, UUID compositionTargetId,
189 ParticipantDeploy participantDeploy, int stage) {
190 var automationComposition = cacheProvider.getAutomationComposition(instanceId);
191 var acElementList = automationComposition.getElements();
192 for (var element : participantDeploy.getAcElementList()) {
193 var compositionInProperties =
194 cacheProvider.getCommonProperties(compositionTargetId, element.getDefinition());
195 var stageSet = ParticipantUtils.findStageSetMigrate(compositionInProperties);
196 if (stageSet.contains(stage)) {
197 var acElement = acElementList.get(element.getId());
198 if (acElement == null) {
199 var newElement = CacheProvider.createAutomationCompositionElement(element);
200 newElement.setParticipantId(participantDeploy.getParticipantId());
201 newElement.setDeployState(DeployState.MIGRATING);
202 newElement.setLockState(LockState.LOCKED);
203 newElement.setStage(stage);
205 acElementList.put(element.getId(), newElement);
206 LOGGER.info("New Ac Element with id {} is added in Migration", element.getId());
208 AcmUtils.recursiveMerge(acElement.getProperties(), element.getProperties());
209 acElement.setDeployState(DeployState.MIGRATING);
210 acElement.setStage(stage);
211 acElement.setDefinition(element.getDefinition());
215 // Check for missing elements and remove them from cache
216 var elementsToRemove = findElementsToRemove(participantDeploy.getAcElementList(), acElementList);
217 for (var key : elementsToRemove) {
218 acElementList.remove(key);
219 LOGGER.info("Element with id {} is removed in Migration", key);
223 private void updateExistingElementsOnThisParticipant(UUID instanceId, ParticipantDeploy participantDeploy) {
224 var acElementList = cacheProvider.getAutomationComposition(instanceId).getElements();
225 for (var element : participantDeploy.getAcElementList()) {
226 var acElement = acElementList.get(element.getId());
227 AcmUtils.recursiveMerge(acElement.getProperties(), element.getProperties());
228 acElement.setDeployState(DeployState.UPDATING);
229 acElement.setSubState(SubState.NONE);
230 acElement.setDefinition(element.getDefinition());
234 private List<UUID> findElementsToRemove(List<AcElementDeploy> acElementDeployList, Map<UUID,
235 AutomationCompositionElement> acElementList) {
236 var acElementDeploySet = acElementDeployList.stream().map(AcElementDeploy::getId).collect(Collectors.toSet());
237 return acElementList.keySet().stream().filter(id -> !acElementDeploySet.contains(id)).toList();
241 * Method to handle when the new state from participant is UNINITIALISED state.
243 * @param messageId the messageId
244 * @param automationComposition participant response
245 * @param startPhaseMsg startPhase from message
247 private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition,
248 Integer startPhaseMsg) {
249 automationComposition.setCompositionTargetId(null);
250 automationComposition.setDeployState(DeployState.UNDEPLOYING);
251 for (var element : automationComposition.getElements().values()) {
252 var compositionInProperties = cacheProvider
253 .getCommonProperties(automationComposition.getCompositionId(), element.getDefinition());
254 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
255 if (startPhaseMsg.equals(startPhase)) {
256 element.setDeployState(DeployState.UNDEPLOYING);
257 var compositionElement = cacheProvider.createCompositionElementDto(
258 automationComposition.getCompositionId(), element, compositionInProperties);
259 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
260 element.getProperties(), element.getOutProperties());
261 listener.undeploy(messageId, compositionElement, instanceElement);
266 private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition,
267 Integer startPhaseMsg) {
268 automationComposition.setDeployState(DeployState.DELETING);
269 for (var element : automationComposition.getElements().values()) {
270 var compositionInProperties = cacheProvider
271 .getCommonProperties(automationComposition.getCompositionId(), element.getDefinition());
272 int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
273 if (startPhaseMsg.equals(startPhase)) {
274 element.setDeployState(DeployState.DELETING);
275 element.setSubState(SubState.NONE);
276 var compositionElement = cacheProvider.createCompositionElementDto(
277 automationComposition.getCompositionId(), element, compositionInProperties);
278 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
279 element.getProperties(), element.getOutProperties());
280 listener.delete(messageId, compositionElement, instanceElement);
286 * Handles AutomationComposition Migration.
288 * @param migrationMsg the AutomationCompositionMigration
290 public void handleAutomationCompositionMigration(AutomationCompositionMigration migrationMsg) {
291 if (migrationMsg.getAutomationCompositionId() == null || migrationMsg.getCompositionTargetId() == null) {
295 var automationComposition = cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId());
296 if (automationComposition == null) {
297 LOGGER.debug("Automation composition {} does not use this participant",
298 migrationMsg.getAutomationCompositionId());
301 var acCopy = new AutomationComposition(automationComposition);
302 automationComposition.setCompositionTargetId(migrationMsg.getCompositionTargetId());
303 automationComposition.setDeployState(DeployState.MIGRATING);
304 for (var participantDeploy : migrationMsg.getParticipantUpdatesList()) {
305 if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
307 migrateExistingElementsOnThisParticipant(migrationMsg.getAutomationCompositionId(),
308 migrationMsg.getCompositionTargetId(), participantDeploy, migrationMsg.getStage());
310 callParticipantMigrate(migrationMsg.getMessageId(), participantDeploy.getAcElementList(),
311 acCopy, migrationMsg.getCompositionTargetId(), migrationMsg.getStage());
316 private void callParticipantMigrate(UUID messageId, List<AcElementDeploy> acElements,
317 AutomationComposition acCopy, UUID compositionTargetId, int stage) {
318 var compositionElementMap = cacheProvider.getCompositionElementDtoMap(acCopy);
319 var instanceElementMap = cacheProvider.getInstanceElementDtoMap(acCopy);
320 var automationComposition = cacheProvider.getAutomationComposition(acCopy.getInstanceId());
321 var compositionElementTargetMap = cacheProvider.getCompositionElementDtoMap(automationComposition,
322 compositionTargetId);
323 var instanceElementMigrateMap = cacheProvider.getInstanceElementDtoMap(automationComposition);
325 // Call migrate for newly added and updated elements
326 for (var acElement : acElements) {
327 var compositionInProperties = cacheProvider
328 .getCommonProperties(compositionTargetId, acElement.getDefinition());
329 var stageSet = ParticipantUtils.findStageSetMigrate(compositionInProperties);
330 if (stageSet.contains(stage)) {
331 if (instanceElementMap.get(acElement.getId()) == null) {
332 var compositionElementDto =
333 new CompositionElementDto(acCopy.getCompositionId(), acElement.getDefinition(),
334 Map.of(), Map.of(), ElementState.NOT_PRESENT);
335 var instanceElementDto = new InstanceElementDto(acCopy.getInstanceId(), acElement.getId(),
336 Map.of(), Map.of(), ElementState.NOT_PRESENT);
337 var compositionElementTargetDto = CacheProvider.changeStateToNew(
338 compositionElementTargetMap.get(acElement.getId()));
339 var instanceElementMigrateDto = CacheProvider
340 .changeStateToNew(instanceElementMigrateMap.get(acElement.getId()));
342 listener.migrate(messageId, compositionElementDto, compositionElementTargetDto, instanceElementDto,
343 instanceElementMigrateDto, stage);
345 listener.migrate(messageId, compositionElementMap.get(acElement.getId()),
346 compositionElementTargetMap.get(acElement.getId()),
347 instanceElementMap.get(acElement.getId()), instanceElementMigrateMap.get(acElement.getId()),
353 // Call migrate for removed elements
354 List<UUID> removedElements = findElementsToRemove(acElements, acCopy.getElements());
355 for (var elementId : removedElements) {
356 var compositionDtoTarget = new CompositionElementDto(compositionTargetId,
357 acCopy.getElements().get(elementId).getDefinition(),
358 Map.of(), Map.of(), ElementState.REMOVED);
359 var instanceDtoTarget = new InstanceElementDto(acCopy.getInstanceId(), elementId, Map.of(),
360 Map.of(), ElementState.REMOVED);
362 listener.migrate(messageId, compositionElementMap.get(elementId), compositionDtoTarget,
363 instanceElementMap.get(elementId), instanceDtoTarget, 0);