29377a923ad9bb0c662a7529773c8dc64673dc59
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
22
23 import io.opentelemetry.context.Context;
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.Map;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
33 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
34 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
35 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
36 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
37 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
38 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
39 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
40 import org.onap.policy.clamp.models.acm.concepts.DeployState;
41 import org.onap.policy.clamp.models.acm.concepts.LockState;
42 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
43 import org.onap.policy.models.base.PfModelException;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.stereotype.Component;
47
48 @Component
49 public class ThreadHandler implements Closeable {
50     private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
51
52     private final AutomationCompositionElementListener listener;
53     private final ParticipantIntermediaryApi intermediaryApi;
54     private final CacheProvider cacheProvider;
55
56     private final Map<UUID, Future<?>> executionMap = new ConcurrentHashMap<>();
57
58     private final ExecutorService executor;
59
60     /**
61      * Constructor.
62      *
63      * @param listener the AutomationComposition ElementListener
64      * @param intermediaryApi the intermediaryApi
65      * @param cacheProvider the CacheProvider
66      * @param parameters the parameters
67      */
68     public ThreadHandler(AutomationCompositionElementListener listener, ParticipantIntermediaryApi intermediaryApi,
69             CacheProvider cacheProvider, ParticipantParameters parameters) {
70         this.listener = listener;
71         this.intermediaryApi = intermediaryApi;
72         this.cacheProvider = cacheProvider;
73         executor = Context.taskWrapping(Executors.newFixedThreadPool(
74                 parameters.getIntermediaryParameters().getThreadPoolSize()));
75         LOGGER.info("ThreadHandler started with thread pool size {}",
76                 parameters.getIntermediaryParameters().getThreadPoolSize());
77     }
78
79     /**
80      * Handle a deploy on a automation composition element.
81      *
82      * @param messageId the messageId
83      * @param compositionElement the information of the Automation Composition Definition Element
84      * @param instanceElement the information of the Automation Composition Instance Element
85      */
86     public void deploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
87         cleanExecution(instanceElement.elementId(), messageId);
88         var result = executor.submit(() -> this.deployProcess(compositionElement, instanceElement));
89         executionMap.put(instanceElement.elementId(), result);
90     }
91
92     private void deployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
93         try {
94             listener.deploy(compositionElement, instanceElement);
95         } catch (PfModelException e) {
96             LOGGER.error("Automation composition element deploy failed {} {}", instanceElement.elementId(),
97                 e.getMessage());
98             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
99                 instanceElement.elementId(), DeployState.UNDEPLOYED, null, StateChangeResult.FAILED,
100                 "Automation composition element deploy failed");
101         }
102         executionMap.remove(instanceElement.elementId());
103     }
104
105     /**
106      * Handle an udeploy on a automation composition element.
107      *
108      * @param messageId the messageId
109      * @param compositionElement the information of the Automation Composition Definition Element
110      * @param instanceElement the information of the Automation Composition Instance Element
111      */
112     public void undeploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
113         cleanExecution(instanceElement.elementId(), messageId);
114         var result = executor.submit(() -> this.undeployProcess(compositionElement, instanceElement));
115         executionMap.put(instanceElement.elementId(), result);
116     }
117
118     private void undeployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
119         try {
120             listener.undeploy(compositionElement, instanceElement);
121         } catch (PfModelException e) {
122             LOGGER.error(
123                 "Automation composition element undeploy failed {} {}", instanceElement.elementId(), e.getMessage());
124             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
125                 instanceElement.elementId(), DeployState.DEPLOYED, null,
126                 StateChangeResult.FAILED, "Automation composition element undeploy failed");
127         }
128         executionMap.remove(instanceElement.elementId());
129     }
130
131     /**
132      * Handle a automation composition element lock.
133      *
134      * @param messageId the messageId
135      * @param compositionElement the information of the Automation Composition Definition Element
136      * @param instanceElement the information of the Automation Composition Instance Element
137      */
138     public void lock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
139         cleanExecution(instanceElement.elementId(), messageId);
140         var result = executor.submit(() -> this.lockProcess(compositionElement, instanceElement));
141         executionMap.put(instanceElement.elementId(), result);
142     }
143
144     private void lockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
145         try {
146             listener.lock(compositionElement, instanceElement);
147         } catch (PfModelException e) {
148             LOGGER.error("Automation composition element lock failed {} {}",
149                 instanceElement.elementId(), e.getMessage());
150             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
151                 instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.FAILED,
152                 "Automation composition element lock failed");
153         }
154         executionMap.remove(instanceElement.elementId());
155     }
156
157     /**
158      * Handle a automation composition element unlock.
159      *
160      * @param messageId the messageId
161      * @param compositionElement the information of the Automation Composition Definition Element
162      * @param instanceElement the information of the Automation Composition Instance Element
163      */
164     public void unlock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
165         cleanExecution(instanceElement.elementId(), messageId);
166         var result = executor.submit(() -> this.unlockProcess(compositionElement, instanceElement));
167         executionMap.put(instanceElement.elementId(), result);
168     }
169
170     private void unlockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
171         try {
172             listener.unlock(compositionElement, instanceElement);
173         } catch (PfModelException e) {
174             LOGGER.error("Automation composition element unlock failed {} {}",
175                 instanceElement.elementId(), e.getMessage());
176             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
177                 instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.FAILED,
178                 "Automation composition element unlock failed");
179         }
180         executionMap.remove(instanceElement.elementId());
181     }
182
183     /**
184      * Handle a automation composition element delete.
185      *
186      * @param messageId the messageId
187      * @param compositionElement the information of the Automation Composition Definition Element
188      * @param instanceElement the information of the Automation Composition Instance Element
189      */
190     public void delete(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
191         cleanExecution(instanceElement.elementId(), messageId);
192         var result = executor.submit(() -> this.deleteProcess(compositionElement, instanceElement));
193         executionMap.put(instanceElement.elementId(), result);
194     }
195
196     private void deleteProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
197         try {
198             listener.delete(compositionElement, instanceElement);
199         } catch (PfModelException e) {
200             LOGGER.error("Automation composition element delete failed {} {}",
201                 instanceElement.elementId(), e.getMessage());
202             intermediaryApi.updateAutomationCompositionElementState(
203                 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED, null,
204                 StateChangeResult.FAILED, "Automation composition element delete failed");
205         }
206         executionMap.remove(instanceElement.elementId());
207     }
208
209     /**
210      * Handle a automation composition element properties update.
211      *
212      * @param messageId the messageId
213      * @param compositionElement the information of the Automation Composition Definition Element
214      * @param instanceElement the information of the Automation Composition Instance Element
215      * @param instanceElementUpdated the information of the Automation Composition Instance Element updated
216      */
217     public void update(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement,
218                        InstanceElementDto instanceElementUpdated) {
219         cleanExecution(instanceElement.elementId(), messageId);
220         var result = executor.submit(() ->
221             this.updateProcess(compositionElement, instanceElement, instanceElementUpdated));
222         executionMap.put(instanceElement.elementId(), result);
223     }
224
225     private void updateProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
226                                InstanceElementDto instanceElementUpdated) {
227         try {
228             listener.update(compositionElement, instanceElement, instanceElementUpdated);
229         } catch (PfModelException e) {
230             LOGGER.error("Automation composition element update failed {} {}",
231                 instanceElement.elementId(), e.getMessage());
232             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
233                 instanceElement.elementId(), DeployState.DEPLOYED, null,
234                 StateChangeResult.FAILED, "Automation composition element update failed");
235         }
236         executionMap.remove(instanceElement.elementId());
237     }
238
239     /**
240      * Clean Execution.
241      *
242      * @param execIdentificationId the identification Id
243      * @param messageId the messageId
244      */
245     public void cleanExecution(UUID execIdentificationId, UUID messageId) {
246         var process = executionMap.get(execIdentificationId);
247         if (process != null) {
248             if (!process.isDone()) {
249                 LOGGER.warn("ThreadHandler cancelling previous thread for execIdentificationId={} messageId={}",
250                         execIdentificationId, messageId);
251                 process.cancel(true);
252             }
253             executionMap.remove(execIdentificationId);
254         }
255         cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
256     }
257
258     /**
259      * Handles prime a Composition Definition.
260      *
261      * @param messageId the messageId
262      * @param composition the composition
263      */
264     public void prime(UUID messageId, CompositionDto composition) {
265         cleanExecution(composition.compositionId(), messageId);
266         var result = executor.submit(() -> this.primeProcess(composition));
267         executionMap.put(composition.compositionId(), result);
268     }
269
270     private void primeProcess(CompositionDto composition) {
271         try {
272             listener.prime(composition);
273             executionMap.remove(composition.compositionId());
274         } catch (PfModelException e) {
275             LOGGER.error("Composition Defintion prime failed {} {}", composition.compositionId(), e.getMessage());
276             intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.COMMISSIONED,
277                 StateChangeResult.FAILED, "Composition Defintion prime failed");
278         }
279     }
280
281     /**
282      * Handles deprime a Composition Definition.
283      *
284      * @param messageId the messageId
285      * @param composition the composition
286      */
287     public void deprime(UUID messageId, CompositionDto composition) {
288         cleanExecution(composition.compositionId(), messageId);
289         var result = executor.submit(() -> this.deprimeProcess(composition));
290         executionMap.put(composition.compositionId(), result);
291     }
292
293     private void deprimeProcess(CompositionDto composition) {
294         try {
295             listener.deprime(composition);
296             executionMap.remove(composition.compositionId());
297         } catch (PfModelException e) {
298             LOGGER.error("Composition Defintion deprime failed {} {}", composition.compositionId(), e.getMessage());
299             intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.PRIMED,
300                 StateChangeResult.FAILED, "Composition Defintion deprime failed");
301         }
302     }
303
304     /**
305      * Closes this stream and releases any system resources associated
306      * with it. If the stream is already closed then invoking this
307      * method has no effect.
308      *
309      * @throws IOException if an I/O error occurs
310      */
311     @Override
312     public void close() throws IOException {
313         executor.shutdown();
314     }
315
316     /**
317      * Handles AutomationComposition Migration.
318      *
319      * @param messageId the messageId
320      * @param compositionElement the information of the Automation Composition Definition Element
321      * @param compositionElementTarget the information of the Automation Composition Definition Element Target
322      * @param instanceElement the information of the Automation Composition Instance Element
323      * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
324      * @param stage the stage
325      */
326     public void migrate(UUID messageId, CompositionElementDto compositionElement,
327         CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
328         InstanceElementDto instanceElementMigrate, int stage) {
329         cleanExecution(instanceElement.elementId(), messageId);
330         var result = executor.submit(() ->
331             this.migrateProcess(compositionElement, compositionElementTarget,
332                 instanceElement, instanceElementMigrate, stage));
333         executionMap.put(instanceElement.elementId(), result);
334     }
335
336     private void migrateProcess(CompositionElementDto compositionElement,
337         CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
338         InstanceElementDto instanceElementMigrate, int stage) {
339         try {
340             listener.migrate(compositionElement, compositionElementTarget,
341                 instanceElement, instanceElementMigrate, stage);
342         } catch (PfModelException e) {
343             LOGGER.error("Automation composition element migrate failed {} {}",
344                 instanceElement.elementId(), e.getMessage());
345             intermediaryApi.updateAutomationCompositionElementState(
346                 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
347                 null, StateChangeResult.FAILED, "Automation composition element migrate failed");
348         }
349         executionMap.remove(instanceElement.elementId());
350     }
351
352     /**
353      * Handles AutomationComposition Migration Precheck.
354      *
355      * @param messageId the messageId
356      * @param compositionElement the information of the Automation Composition Definition Element
357      * @param compositionElementTarget the information of the Automation Composition Definition Element Target
358      * @param instanceElement the information of the Automation Composition Instance Element
359      * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
360      */
361     public void migratePrecheck(UUID messageId, CompositionElementDto compositionElement,
362         CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
363         InstanceElementDto instanceElementMigrate) {
364         cleanExecution(instanceElement.elementId(), messageId);
365         var result = executor.submit(() ->
366             this.migratePrecheckProcess(compositionElement, compositionElementTarget, instanceElement,
367                 instanceElementMigrate));
368         executionMap.put(instanceElement.elementId(), result);
369     }
370
371     private void migratePrecheckProcess(CompositionElementDto compositionElement,
372         CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
373         InstanceElementDto instanceElementMigrate) {
374         try {
375             listener.migratePrecheck(compositionElement, compositionElementTarget, instanceElement,
376                 instanceElementMigrate);
377         } catch (PfModelException e) {
378             LOGGER.error("Automation composition element migrate precheck failed {} {}",
379                 instanceElement.elementId(), e.getMessage());
380             intermediaryApi.updateAutomationCompositionElementState(
381                 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
382                 null, StateChangeResult.FAILED, "Automation composition element migrate precheck failed");
383         }
384         executionMap.remove(instanceElement.elementId());
385     }
386
387     /**
388      * Handles AutomationComposition Prepare Post Deploy.
389      *
390      * @param messageId the messageId
391      * @param compositionElement the information of the Automation Composition Definition Element
392      * @param instanceElement the information of the Automation Composition Instance Element
393      */
394     public void review(UUID messageId, CompositionElementDto compositionElement,
395         InstanceElementDto instanceElement) {
396         cleanExecution(instanceElement.elementId(), messageId);
397         var result = executor.submit(() -> this.reviewProcess(compositionElement, instanceElement));
398         executionMap.put(instanceElement.elementId(), result);
399     }
400
401     private void reviewProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
402         try {
403             listener.review(compositionElement, instanceElement);
404         } catch (PfModelException e) {
405             LOGGER.error("Automation composition element Review failed {} {}",
406                 instanceElement.elementId(), e.getMessage());
407             intermediaryApi.updateAutomationCompositionElementState(
408                 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
409                 null, StateChangeResult.FAILED, "Automation composition element Review failed");
410         }
411         executionMap.remove(instanceElement.elementId());
412     }
413
414     /**
415      * Handles AutomationComposition Prepare Pre Deploy.
416      *
417      * @param messageId the messageId
418      * @param compositionElement the information of the Automation Composition Definition Element
419      * @param instanceElement the information of the Automation Composition Instance Element
420      * @param stage the stage
421      */
422     public void prepare(UUID messageId, CompositionElementDto compositionElement,
423         InstanceElementDto instanceElement, int stage) {
424         cleanExecution(instanceElement.elementId(), messageId);
425         var result = executor.submit(() -> this.prepareProcess(compositionElement, instanceElement, stage));
426         executionMap.put(instanceElement.elementId(), result);
427     }
428
429     private void prepareProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
430         int stage) {
431         try {
432             listener.prepare(compositionElement, instanceElement, stage);
433         } catch (PfModelException e) {
434             LOGGER.error("Automation composition element prepare Pre Deploy failed {} {}",
435                 instanceElement.elementId(), e.getMessage());
436             intermediaryApi.updateAutomationCompositionElementState(
437                 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED,
438                 null, StateChangeResult.FAILED, "Automation composition element prepare Pre Deploy failed");
439         }
440         executionMap.remove(instanceElement.elementId());
441     }
442
443     /**
444      * Handles AutomationComposition Rollback.
445      *
446      * @param compositionElement the information of the Automation Composition Definition Element
447      * @param compositionElementRollback the information of the Automation Composition Definition Element Target
448      * @param instanceElement the information of the Automation Composition Instance Element
449      * @param instanceElementRollback the information of the Automation Composition Instance Element updated
450      * @param stage the stage
451      */
452     public void rollback(UUID messageId, CompositionElementDto compositionElement,
453             CompositionElementDto compositionElementRollback, InstanceElementDto instanceElement,
454             InstanceElementDto instanceElementRollback, int stage) {
455         cleanExecution(instanceElement.elementId(), messageId);
456         var result = executor.submit(() ->
457                 this.rollbackProcess(compositionElement, compositionElementRollback, instanceElement,
458                         instanceElementRollback, stage));
459         executionMap.put(instanceElement.elementId(), result);
460     }
461
462     private void rollbackProcess(CompositionElementDto compositionElement,
463             CompositionElementDto compositionElementRollback, InstanceElementDto instanceElement,
464             InstanceElementDto instanceElementRollback, int stage) {
465         try {
466             listener.rollbackMigration(compositionElement, compositionElementRollback, instanceElement,
467                     instanceElementRollback, stage);
468         } catch (PfModelException e) {
469             LOGGER.error("Automation composition element rollback failed {} {}",
470                     instanceElement.elementId(), e.getMessage());
471             intermediaryApi.updateAutomationCompositionElementState(
472                     instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
473                     null, StateChangeResult.FAILED, "Automation composition rollback migrate failed");
474         }
475         executionMap.remove(instanceElement.elementId());
476     }
477 }