2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import io.opentelemetry.context.Context;
24 import java.io.Closeable;
25 import java.io.IOException;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
33 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
34 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
35 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
36 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
37 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
38 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
39 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
40 import org.onap.policy.clamp.models.acm.concepts.DeployState;
41 import org.onap.policy.clamp.models.acm.concepts.LockState;
42 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
43 import org.onap.policy.models.base.PfModelException;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.stereotype.Component;
49 public class ThreadHandler implements Closeable {
50 private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
52 private final AutomationCompositionElementListener listener;
53 private final ParticipantIntermediaryApi intermediaryApi;
54 private final CacheProvider cacheProvider;
56 private final Map<UUID, Future<?>> executionMap = new ConcurrentHashMap<>();
58 private final ExecutorService executor;
63 * @param listener the AutomationComposition ElementListener
64 * @param intermediaryApi the intermediaryApi
65 * @param cacheProvider the CacheProvider
66 * @param parameters the parameters
68 public ThreadHandler(AutomationCompositionElementListener listener, ParticipantIntermediaryApi intermediaryApi,
69 CacheProvider cacheProvider, ParticipantParameters parameters) {
70 this.listener = listener;
71 this.intermediaryApi = intermediaryApi;
72 this.cacheProvider = cacheProvider;
73 executor = Context.taskWrapping(Executors.newFixedThreadPool(
74 parameters.getIntermediaryParameters().getThreadPoolSize()));
75 LOGGER.info("ThreadHandler started with thread pool size {}",
76 parameters.getIntermediaryParameters().getThreadPoolSize());
80 * Handle a deploy on a automation composition element.
82 * @param messageId the messageId
83 * @param compositionElement the information of the Automation Composition Definition Element
84 * @param instanceElement the information of the Automation Composition Instance Element
86 public void deploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
87 cleanExecution(instanceElement.elementId(), messageId);
88 var result = executor.submit(() -> this.deployProcess(compositionElement, instanceElement));
89 executionMap.put(instanceElement.elementId(), result);
92 private void deployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
94 listener.deploy(compositionElement, instanceElement);
95 } catch (PfModelException e) {
96 LOGGER.error("Automation composition element deploy failed {} {}", instanceElement.elementId(),
98 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
99 instanceElement.elementId(), DeployState.UNDEPLOYED, null, StateChangeResult.FAILED,
100 "Automation composition element deploy failed");
102 executionMap.remove(instanceElement.elementId());
106 * Handle an udeploy on a automation composition element.
108 * @param messageId the messageId
109 * @param compositionElement the information of the Automation Composition Definition Element
110 * @param instanceElement the information of the Automation Composition Instance Element
112 public void undeploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
113 cleanExecution(instanceElement.elementId(), messageId);
114 var result = executor.submit(() -> this.undeployProcess(compositionElement, instanceElement));
115 executionMap.put(instanceElement.elementId(), result);
118 private void undeployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
120 listener.undeploy(compositionElement, instanceElement);
121 } catch (PfModelException e) {
123 "Automation composition element undeploy failed {} {}", instanceElement.elementId(), e.getMessage());
124 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
125 instanceElement.elementId(), DeployState.DEPLOYED, null,
126 StateChangeResult.FAILED, "Automation composition element undeploy failed");
128 executionMap.remove(instanceElement.elementId());
132 * Handle a automation composition element lock.
134 * @param messageId the messageId
135 * @param compositionElement the information of the Automation Composition Definition Element
136 * @param instanceElement the information of the Automation Composition Instance Element
138 public void lock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
139 cleanExecution(instanceElement.elementId(), messageId);
140 var result = executor.submit(() -> this.lockProcess(compositionElement, instanceElement));
141 executionMap.put(instanceElement.elementId(), result);
144 private void lockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
146 listener.lock(compositionElement, instanceElement);
147 } catch (PfModelException e) {
148 LOGGER.error("Automation composition element lock failed {} {}",
149 instanceElement.elementId(), e.getMessage());
150 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
151 instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.FAILED,
152 "Automation composition element lock failed");
154 executionMap.remove(instanceElement.elementId());
158 * Handle a automation composition element unlock.
160 * @param messageId the messageId
161 * @param compositionElement the information of the Automation Composition Definition Element
162 * @param instanceElement the information of the Automation Composition Instance Element
164 public void unlock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
165 cleanExecution(instanceElement.elementId(), messageId);
166 var result = executor.submit(() -> this.unlockProcess(compositionElement, instanceElement));
167 executionMap.put(instanceElement.elementId(), result);
170 private void unlockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
172 listener.unlock(compositionElement, instanceElement);
173 } catch (PfModelException e) {
174 LOGGER.error("Automation composition element unlock failed {} {}",
175 instanceElement.elementId(), e.getMessage());
176 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
177 instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.FAILED,
178 "Automation composition element unlock failed");
180 executionMap.remove(instanceElement.elementId());
184 * Handle a automation composition element delete.
186 * @param messageId the messageId
187 * @param compositionElement the information of the Automation Composition Definition Element
188 * @param instanceElement the information of the Automation Composition Instance Element
190 public void delete(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
191 cleanExecution(instanceElement.elementId(), messageId);
192 var result = executor.submit(() -> this.deleteProcess(compositionElement, instanceElement));
193 executionMap.put(instanceElement.elementId(), result);
196 private void deleteProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
198 listener.delete(compositionElement, instanceElement);
199 } catch (PfModelException e) {
200 LOGGER.error("Automation composition element delete failed {} {}",
201 instanceElement.elementId(), e.getMessage());
202 intermediaryApi.updateAutomationCompositionElementState(
203 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED, null,
204 StateChangeResult.FAILED, "Automation composition element delete failed");
206 executionMap.remove(instanceElement.elementId());
210 * Handle a automation composition element properties update.
212 * @param messageId the messageId
213 * @param compositionElement the information of the Automation Composition Definition Element
214 * @param instanceElement the information of the Automation Composition Instance Element
215 * @param instanceElementUpdated the information of the Automation Composition Instance Element updated
217 public void update(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement,
218 InstanceElementDto instanceElementUpdated) {
219 cleanExecution(instanceElement.elementId(), messageId);
220 var result = executor.submit(() ->
221 this.updateProcess(compositionElement, instanceElement, instanceElementUpdated));
222 executionMap.put(instanceElement.elementId(), result);
225 private void updateProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
226 InstanceElementDto instanceElementUpdated) {
228 listener.update(compositionElement, instanceElement, instanceElementUpdated);
229 } catch (PfModelException e) {
230 LOGGER.error("Automation composition element update failed {} {}",
231 instanceElement.elementId(), e.getMessage());
232 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
233 instanceElement.elementId(), DeployState.DEPLOYED, null,
234 StateChangeResult.FAILED, "Automation composition element update failed");
236 executionMap.remove(instanceElement.elementId());
242 * @param execIdentificationId the identification Id
243 * @param messageId the messageId
245 public void cleanExecution(UUID execIdentificationId, UUID messageId) {
246 var process = executionMap.get(execIdentificationId);
247 if (process != null) {
248 if (!process.isDone()) {
249 LOGGER.warn("ThreadHandler cancelling previous thread for execIdentificationId={} messageId={}",
250 execIdentificationId, messageId);
251 process.cancel(true);
253 executionMap.remove(execIdentificationId);
255 cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
259 * Handles prime a Composition Definition.
261 * @param messageId the messageId
262 * @param composition the composition
264 public void prime(UUID messageId, CompositionDto composition) {
265 cleanExecution(composition.compositionId(), messageId);
266 var result = executor.submit(() -> this.primeProcess(composition));
267 executionMap.put(composition.compositionId(), result);
270 private void primeProcess(CompositionDto composition) {
272 listener.prime(composition);
273 executionMap.remove(composition.compositionId());
274 } catch (PfModelException e) {
275 LOGGER.error("Composition Defintion prime failed {} {}", composition.compositionId(), e.getMessage());
276 intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.COMMISSIONED,
277 StateChangeResult.FAILED, "Composition Defintion prime failed");
282 * Handles deprime a Composition Definition.
284 * @param messageId the messageId
285 * @param composition the composition
287 public void deprime(UUID messageId, CompositionDto composition) {
288 cleanExecution(composition.compositionId(), messageId);
289 var result = executor.submit(() -> this.deprimeProcess(composition));
290 executionMap.put(composition.compositionId(), result);
293 private void deprimeProcess(CompositionDto composition) {
295 listener.deprime(composition);
296 executionMap.remove(composition.compositionId());
297 } catch (PfModelException e) {
298 LOGGER.error("Composition Defintion deprime failed {} {}", composition.compositionId(), e.getMessage());
299 intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.PRIMED,
300 StateChangeResult.FAILED, "Composition Defintion deprime failed");
305 * Closes this stream and releases any system resources associated
306 * with it. If the stream is already closed then invoking this
307 * method has no effect.
309 * @throws IOException if an I/O error occurs
312 public void close() throws IOException {
317 * Handles AutomationComposition Migration.
319 * @param messageId the messageId
320 * @param compositionElement the information of the Automation Composition Definition Element
321 * @param compositionElementTarget the information of the Automation Composition Definition Element Target
322 * @param instanceElement the information of the Automation Composition Instance Element
323 * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
324 * @param stage the stage
326 public void migrate(UUID messageId, CompositionElementDto compositionElement,
327 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
328 InstanceElementDto instanceElementMigrate, int stage) {
329 cleanExecution(instanceElement.elementId(), messageId);
330 var result = executor.submit(() ->
331 this.migrateProcess(compositionElement, compositionElementTarget,
332 instanceElement, instanceElementMigrate, stage));
333 executionMap.put(instanceElement.elementId(), result);
336 private void migrateProcess(CompositionElementDto compositionElement,
337 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
338 InstanceElementDto instanceElementMigrate, int stage) {
340 listener.migrate(compositionElement, compositionElementTarget,
341 instanceElement, instanceElementMigrate, stage);
342 } catch (PfModelException e) {
343 LOGGER.error("Automation composition element migrate failed {} {}",
344 instanceElement.elementId(), e.getMessage());
345 intermediaryApi.updateAutomationCompositionElementState(
346 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
347 null, StateChangeResult.FAILED, "Automation composition element migrate failed");
349 executionMap.remove(instanceElement.elementId());
353 * Handles AutomationComposition Migration Precheck.
355 * @param messageId the messageId
356 * @param compositionElement the information of the Automation Composition Definition Element
357 * @param compositionElementTarget the information of the Automation Composition Definition Element Target
358 * @param instanceElement the information of the Automation Composition Instance Element
359 * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
361 public void migratePrecheck(UUID messageId, CompositionElementDto compositionElement,
362 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
363 InstanceElementDto instanceElementMigrate) {
364 cleanExecution(instanceElement.elementId(), messageId);
365 var result = executor.submit(() ->
366 this.migratePrecheckProcess(compositionElement, compositionElementTarget, instanceElement,
367 instanceElementMigrate));
368 executionMap.put(instanceElement.elementId(), result);
371 private void migratePrecheckProcess(CompositionElementDto compositionElement,
372 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
373 InstanceElementDto instanceElementMigrate) {
375 listener.migratePrecheck(compositionElement, compositionElementTarget, instanceElement,
376 instanceElementMigrate);
377 } catch (PfModelException e) {
378 LOGGER.error("Automation composition element migrate precheck failed {} {}",
379 instanceElement.elementId(), e.getMessage());
380 intermediaryApi.updateAutomationCompositionElementState(
381 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
382 null, StateChangeResult.FAILED, "Automation composition element migrate precheck failed");
384 executionMap.remove(instanceElement.elementId());
388 * Handles AutomationComposition Prepare Post Deploy.
390 * @param messageId the messageId
391 * @param compositionElement the information of the Automation Composition Definition Element
392 * @param instanceElement the information of the Automation Composition Instance Element
394 public void review(UUID messageId, CompositionElementDto compositionElement,
395 InstanceElementDto instanceElement) {
396 cleanExecution(instanceElement.elementId(), messageId);
397 var result = executor.submit(() -> this.reviewProcess(compositionElement, instanceElement));
398 executionMap.put(instanceElement.elementId(), result);
401 private void reviewProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
403 listener.review(compositionElement, instanceElement);
404 } catch (PfModelException e) {
405 LOGGER.error("Automation composition element Review failed {} {}",
406 instanceElement.elementId(), e.getMessage());
407 intermediaryApi.updateAutomationCompositionElementState(
408 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
409 null, StateChangeResult.FAILED, "Automation composition element Review failed");
411 executionMap.remove(instanceElement.elementId());
415 * Handles AutomationComposition Prepare Pre Deploy.
417 * @param messageId the messageId
418 * @param compositionElement the information of the Automation Composition Definition Element
419 * @param instanceElement the information of the Automation Composition Instance Element
420 * @param stage the stage
422 public void prepare(UUID messageId, CompositionElementDto compositionElement,
423 InstanceElementDto instanceElement, int stage) {
424 cleanExecution(instanceElement.elementId(), messageId);
425 var result = executor.submit(() -> this.prepareProcess(compositionElement, instanceElement, stage));
426 executionMap.put(instanceElement.elementId(), result);
429 private void prepareProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
432 listener.prepare(compositionElement, instanceElement, stage);
433 } catch (PfModelException e) {
434 LOGGER.error("Automation composition element prepare Pre Deploy failed {} {}",
435 instanceElement.elementId(), e.getMessage());
436 intermediaryApi.updateAutomationCompositionElementState(
437 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED,
438 null, StateChangeResult.FAILED, "Automation composition element prepare Pre Deploy failed");
440 executionMap.remove(instanceElement.elementId());
444 * Handles AutomationComposition Rollback.
446 * @param compositionElement the information of the Automation Composition Definition Element
447 * @param compositionElementRollback the information of the Automation Composition Definition Element Target
448 * @param instanceElement the information of the Automation Composition Instance Element
449 * @param instanceElementRollback the information of the Automation Composition Instance Element updated
450 * @param stage the stage
452 public void rollback(UUID messageId, CompositionElementDto compositionElement,
453 CompositionElementDto compositionElementRollback, InstanceElementDto instanceElement,
454 InstanceElementDto instanceElementRollback, int stage) {
455 cleanExecution(instanceElement.elementId(), messageId);
456 var result = executor.submit(() ->
457 this.rollbackProcess(compositionElement, compositionElementRollback, instanceElement,
458 instanceElementRollback, stage));
459 executionMap.put(instanceElement.elementId(), result);
462 private void rollbackProcess(CompositionElementDto compositionElement,
463 CompositionElementDto compositionElementRollback, InstanceElementDto instanceElement,
464 InstanceElementDto instanceElementRollback, int stage) {
466 listener.rollbackMigration(compositionElement, compositionElementRollback, instanceElement,
467 instanceElementRollback, stage);
468 } catch (PfModelException e) {
469 LOGGER.error("Automation composition element rollback failed {} {}",
470 instanceElement.elementId(), e.getMessage());
471 intermediaryApi.updateAutomationCompositionElementState(
472 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
473 null, StateChangeResult.FAILED, "Automation composition rollback migrate failed");
475 executionMap.remove(instanceElement.elementId());