2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import io.opentelemetry.context.Context;
24 import java.io.Closeable;
25 import java.io.IOException;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
33 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
34 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
35 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
36 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
37 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
38 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
39 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
40 import org.onap.policy.clamp.models.acm.concepts.DeployState;
41 import org.onap.policy.clamp.models.acm.concepts.LockState;
42 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
43 import org.onap.policy.models.base.PfModelException;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.stereotype.Component;
49 public class ThreadHandler implements Closeable {
50 private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
52 private final AutomationCompositionElementListener listener;
53 private final ParticipantIntermediaryApi intermediaryApi;
54 private final CacheProvider cacheProvider;
56 private final Map<UUID, Future<?>> executionMap = new ConcurrentHashMap<>();
58 private final ExecutorService executor;
63 * @param listener the AutomationComposition ElementListener
64 * @param intermediaryApi the intermediaryApi
65 * @param cacheProvider the CacheProvider
66 * @param parameters the parameters
68 public ThreadHandler(AutomationCompositionElementListener listener, ParticipantIntermediaryApi intermediaryApi,
69 CacheProvider cacheProvider, ParticipantParameters parameters) {
70 this.listener = listener;
71 this.intermediaryApi = intermediaryApi;
72 this.cacheProvider = cacheProvider;
73 executor = Context.taskWrapping(Executors.newFixedThreadPool(
74 parameters.getIntermediaryParameters().getThreadPoolSize()));
78 * Handle a deploy on a automation composition element.
80 * @param messageId the messageId
81 * @param compositionElement the information of the Automation Composition Definition Element
82 * @param instanceElement the information of the Automation Composition Instance Element
84 public void deploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
85 cleanExecution(instanceElement.elementId(), messageId);
86 var result = executor.submit(() -> this.deployProcess(compositionElement, instanceElement));
87 executionMap.put(instanceElement.elementId(), result);
90 private void deployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
92 listener.deploy(compositionElement, instanceElement);
93 } catch (PfModelException e) {
94 LOGGER.error("Automation composition element deploy failed {} {}", instanceElement.elementId(),
96 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
97 instanceElement.elementId(), DeployState.UNDEPLOYED, null, StateChangeResult.FAILED,
98 "Automation composition element deploy failed");
100 executionMap.remove(instanceElement.elementId());
104 * Handle an udeploy on a automation composition element.
106 * @param messageId the messageId
107 * @param compositionElement the information of the Automation Composition Definition Element
108 * @param instanceElement the information of the Automation Composition Instance Element
110 public void undeploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
111 cleanExecution(instanceElement.elementId(), messageId);
112 var result = executor.submit(() -> this.undeployProcess(compositionElement, instanceElement));
113 executionMap.put(instanceElement.elementId(), result);
116 private void undeployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
118 listener.undeploy(compositionElement, instanceElement);
119 } catch (PfModelException e) {
121 "Automation composition element undeploy failed {} {}", instanceElement.elementId(), e.getMessage());
122 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
123 instanceElement.elementId(), DeployState.DEPLOYED, null,
124 StateChangeResult.FAILED, "Automation composition element undeploy failed");
126 executionMap.remove(instanceElement.elementId());
130 * Handle a automation composition element lock.
132 * @param messageId the messageId
133 * @param compositionElement the information of the Automation Composition Definition Element
134 * @param instanceElement the information of the Automation Composition Instance Element
136 public void lock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
137 cleanExecution(instanceElement.elementId(), messageId);
138 var result = executor.submit(() -> this.lockProcess(compositionElement, instanceElement));
139 executionMap.put(instanceElement.elementId(), result);
142 private void lockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
144 listener.lock(compositionElement, instanceElement);
145 } catch (PfModelException e) {
146 LOGGER.error("Automation composition element lock failed {} {}",
147 instanceElement.elementId(), e.getMessage());
148 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
149 instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.FAILED,
150 "Automation composition element lock failed");
152 executionMap.remove(instanceElement.elementId());
156 * Handle a automation composition element unlock.
158 * @param messageId the messageId
159 * @param compositionElement the information of the Automation Composition Definition Element
160 * @param instanceElement the information of the Automation Composition Instance Element
162 public void unlock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
163 cleanExecution(instanceElement.elementId(), messageId);
164 var result = executor.submit(() -> this.unlockProcess(compositionElement, instanceElement));
165 executionMap.put(instanceElement.elementId(), result);
168 private void unlockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
170 listener.unlock(compositionElement, instanceElement);
171 } catch (PfModelException e) {
172 LOGGER.error("Automation composition element unlock failed {} {}",
173 instanceElement.elementId(), e.getMessage());
174 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
175 instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.FAILED,
176 "Automation composition element unlock failed");
178 executionMap.remove(instanceElement.elementId());
182 * Handle a automation composition element delete.
184 * @param messageId the messageId
185 * @param compositionElement the information of the Automation Composition Definition Element
186 * @param instanceElement the information of the Automation Composition Instance Element
188 public void delete(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
189 cleanExecution(instanceElement.elementId(), messageId);
190 var result = executor.submit(() -> this.deleteProcess(compositionElement, instanceElement));
191 executionMap.put(instanceElement.elementId(), result);
194 private void deleteProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
196 listener.delete(compositionElement, instanceElement);
197 } catch (PfModelException e) {
198 LOGGER.error("Automation composition element delete failed {} {}",
199 instanceElement.elementId(), e.getMessage());
200 intermediaryApi.updateAutomationCompositionElementState(
201 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED, null,
202 StateChangeResult.FAILED, "Automation composition element delete failed");
204 executionMap.remove(instanceElement.elementId());
208 * Handle a automation composition element properties update.
210 * @param messageId the messageId
211 * @param compositionElement the information of the Automation Composition Definition Element
212 * @param instanceElement the information of the Automation Composition Instance Element
213 * @param instanceElementUpdated the information of the Automation Composition Instance Element updated
215 public void update(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement,
216 InstanceElementDto instanceElementUpdated) {
217 cleanExecution(instanceElement.elementId(), messageId);
218 var result = executor.submit(() ->
219 this.updateProcess(compositionElement, instanceElement, instanceElementUpdated));
220 executionMap.put(instanceElement.elementId(), result);
223 private void updateProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
224 InstanceElementDto instanceElementUpdated) {
226 listener.update(compositionElement, instanceElement, instanceElementUpdated);
227 } catch (PfModelException e) {
228 LOGGER.error("Automation composition element update failed {} {}",
229 instanceElement.elementId(), e.getMessage());
230 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
231 instanceElement.elementId(), DeployState.DEPLOYED, null,
232 StateChangeResult.FAILED, "Automation composition element update failed");
234 executionMap.remove(instanceElement.elementId());
240 * @param execIdentificationId the identification Id
241 * @param messageId the messageId
243 public void cleanExecution(UUID execIdentificationId, UUID messageId) {
244 var process = executionMap.get(execIdentificationId);
245 if (process != null) {
246 if (!process.isDone()) {
247 process.cancel(true);
249 executionMap.remove(execIdentificationId);
251 cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
255 * Handles prime a Composition Definition.
257 * @param messageId the messageId
258 * @param composition the composition
260 public void prime(UUID messageId, CompositionDto composition) {
261 cleanExecution(composition.compositionId(), messageId);
262 var result = executor.submit(() -> this.primeProcess(composition));
263 executionMap.put(composition.compositionId(), result);
266 private void primeProcess(CompositionDto composition) {
268 listener.prime(composition);
269 executionMap.remove(composition.compositionId());
270 } catch (PfModelException e) {
271 LOGGER.error("Composition Defintion prime failed {} {}", composition.compositionId(), e.getMessage());
272 intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.COMMISSIONED,
273 StateChangeResult.FAILED, "Composition Defintion prime failed");
278 * Handles deprime a Composition Definition.
280 * @param messageId the messageId
281 * @param composition the composition
283 public void deprime(UUID messageId, CompositionDto composition) {
284 cleanExecution(composition.compositionId(), messageId);
285 var result = executor.submit(() -> this.deprimeProcess(composition));
286 executionMap.put(composition.compositionId(), result);
289 private void deprimeProcess(CompositionDto composition) {
291 listener.deprime(composition);
292 executionMap.remove(composition.compositionId());
293 } catch (PfModelException e) {
294 LOGGER.error("Composition Defintion deprime failed {} {}", composition.compositionId(), e.getMessage());
295 intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.PRIMED,
296 StateChangeResult.FAILED, "Composition Defintion deprime failed");
301 * Closes this stream and releases any system resources associated
302 * with it. If the stream is already closed then invoking this
303 * method has no effect.
305 * @throws IOException if an I/O error occurs
308 public void close() throws IOException {
313 * Handles AutomationComposition Migration.
315 * @param messageId the messageId
316 * @param compositionElement the information of the Automation Composition Definition Element
317 * @param compositionElementTarget the information of the Automation Composition Definition Element Target
318 * @param instanceElement the information of the Automation Composition Instance Element
319 * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
320 * @param stage the stage
322 public void migrate(UUID messageId, CompositionElementDto compositionElement,
323 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
324 InstanceElementDto instanceElementMigrate, int stage) {
325 cleanExecution(instanceElement.elementId(), messageId);
326 var result = executor.submit(() ->
327 this.migrateProcess(compositionElement, compositionElementTarget,
328 instanceElement, instanceElementMigrate, stage));
329 executionMap.put(instanceElement.elementId(), result);
332 private void migrateProcess(CompositionElementDto compositionElement,
333 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
334 InstanceElementDto instanceElementMigrate, int stage) {
336 listener.migrate(compositionElement, compositionElementTarget,
337 instanceElement, instanceElementMigrate, stage);
338 } catch (PfModelException e) {
339 LOGGER.error("Automation composition element migrate failed {} {}",
340 instanceElement.elementId(), e.getMessage());
341 intermediaryApi.updateAutomationCompositionElementState(
342 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
343 null, StateChangeResult.FAILED, "Automation composition element migrate failed");
345 executionMap.remove(instanceElement.elementId());
349 * Handles AutomationComposition Migration Precheck.
351 * @param messageId the messageId
352 * @param compositionElement the information of the Automation Composition Definition Element
353 * @param compositionElementTarget the information of the Automation Composition Definition Element Target
354 * @param instanceElement the information of the Automation Composition Instance Element
355 * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
357 public void migratePrecheck(UUID messageId, CompositionElementDto compositionElement,
358 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
359 InstanceElementDto instanceElementMigrate) {
360 cleanExecution(instanceElement.elementId(), messageId);
361 var result = executor.submit(() ->
362 this.migratePrecheckProcess(compositionElement, compositionElementTarget, instanceElement,
363 instanceElementMigrate));
364 executionMap.put(instanceElement.elementId(), result);
367 private void migratePrecheckProcess(CompositionElementDto compositionElement,
368 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
369 InstanceElementDto instanceElementMigrate) {
371 listener.migratePrecheck(compositionElement, compositionElementTarget, instanceElement,
372 instanceElementMigrate);
373 } catch (PfModelException e) {
374 LOGGER.error("Automation composition element migrate precheck failed {} {}",
375 instanceElement.elementId(), e.getMessage());
376 intermediaryApi.updateAutomationCompositionElementState(
377 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
378 null, StateChangeResult.FAILED, "Automation composition element migrate precheck failed");
380 executionMap.remove(instanceElement.elementId());
384 * Handles AutomationComposition Prepare Post Deploy.
386 * @param messageId the messageId
387 * @param compositionElement the information of the Automation Composition Definition Element
388 * @param instanceElement the information of the Automation Composition Instance Element
390 public void review(UUID messageId, CompositionElementDto compositionElement,
391 InstanceElementDto instanceElement) {
392 cleanExecution(instanceElement.elementId(), messageId);
393 var result = executor.submit(() -> this.reviewProcess(compositionElement, instanceElement));
394 executionMap.put(instanceElement.elementId(), result);
397 private void reviewProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
399 listener.review(compositionElement, instanceElement);
400 } catch (PfModelException e) {
401 LOGGER.error("Automation composition element Review failed {} {}",
402 instanceElement.elementId(), e.getMessage());
403 intermediaryApi.updateAutomationCompositionElementState(
404 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
405 null, StateChangeResult.FAILED, "Automation composition element Review failed");
407 executionMap.remove(instanceElement.elementId());
411 * Handles AutomationComposition Prepare Pre Deploy.
413 * @param messageId the messageId
414 * @param compositionElement the information of the Automation Composition Definition Element
415 * @param instanceElement the information of the Automation Composition Instance Element
416 * @param stage the stage
418 public void prepare(UUID messageId, CompositionElementDto compositionElement,
419 InstanceElementDto instanceElement, int stage) {
420 cleanExecution(instanceElement.elementId(), messageId);
421 var result = executor.submit(() -> this.prepareProcess(compositionElement, instanceElement, stage));
422 executionMap.put(instanceElement.elementId(), result);
425 private void prepareProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
428 listener.prepare(compositionElement, instanceElement, stage);
429 } catch (PfModelException e) {
430 LOGGER.error("Automation composition element prepare Pre Deploy failed {} {}",
431 instanceElement.elementId(), e.getMessage());
432 intermediaryApi.updateAutomationCompositionElementState(
433 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED,
434 null, StateChangeResult.FAILED, "Automation composition element prepare Pre Deploy failed");
436 executionMap.remove(instanceElement.elementId());
440 * Handles AutomationComposition Rollback.
442 * @param compositionElement the information of the Automation Composition Definition Element
443 * @param compositionElementRollback the information of the Automation Composition Definition Element Target
444 * @param instanceElement the information of the Automation Composition Instance Element
445 * @param instanceElementRollback the information of the Automation Composition Instance Element updated
446 * @param stage the stage
448 public void rollback(UUID messageId, CompositionElementDto compositionElement,
449 CompositionElementDto compositionElementRollback, InstanceElementDto instanceElement,
450 InstanceElementDto instanceElementRollback, int stage) {
451 cleanExecution(instanceElement.elementId(), messageId);
452 var result = executor.submit(() ->
453 this.rollbackProcess(compositionElement, compositionElementRollback, instanceElement,
454 instanceElementRollback, stage));
455 executionMap.put(instanceElement.elementId(), result);
458 private void rollbackProcess(CompositionElementDto compositionElement,
459 CompositionElementDto compositionElementRollback, InstanceElementDto instanceElement,
460 InstanceElementDto instanceElementRollback, int stage) {
462 listener.rollbackMigration(compositionElement, compositionElementRollback, instanceElement,
463 instanceElementRollback, stage);
464 } catch (PfModelException e) {
465 LOGGER.error("Automation composition element rollback failed {} {}",
466 instanceElement.elementId(), e.getMessage());
467 intermediaryApi.updateAutomationCompositionElementState(
468 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
469 null, StateChangeResult.FAILED, "Automation composition rollback migrate failed");
471 executionMap.remove(instanceElement.elementId());