0aed919e52ce7a74314ecfa0201bca603721f2f8
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023-2024 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
22
23 import io.opentelemetry.context.Context;
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import lombok.RequiredArgsConstructor;
35 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
36 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
37 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
38 import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
39 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
40 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
41 import org.onap.policy.clamp.models.acm.concepts.DeployState;
42 import org.onap.policy.clamp.models.acm.concepts.LockState;
43 import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
44 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
45 import org.onap.policy.models.base.PfModelException;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Component;
49
50 @Component
51 @RequiredArgsConstructor
52 public class ThreadHandler implements Closeable {
53     private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
54
55     private final AutomationCompositionElementListener listener;
56     private final ParticipantIntermediaryApi intermediaryApi;
57     private final CacheProvider cacheProvider;
58
59     private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>();
60
61     private final ExecutorService executor =
62             Context.taskWrapping(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
63
64     /**
65      * Handle an update on a automation composition element.
66      *
67      * @param messageId the messageId
68      * @param instanceId the automationComposition Id
69      * @param element the information on the automation composition element
70      * @param properties properties Map
71      */
72     public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
73         cleanExecution(element.getId(), messageId);
74         var result = executor.submit(() -> this.deployProcess(instanceId, element, properties));
75         executionMap.put(element.getId(), result);
76     }
77
78     private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
79         try {
80             listener.deploy(instanceId, element, properties);
81         } catch (PfModelException e) {
82             LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
83             intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED,
84                     null, StateChangeResult.FAILED, "Automation composition element deploy failed");
85         }
86         executionMap.remove(element.getId());
87     }
88
89     /**
90      * Handle a automation composition element state change.
91      *
92      * @param messageId the messageId
93      * @param instanceId the automationComposition Id
94      * @param elementId the ID of the automation composition element
95      */
96     public void undeploy(UUID messageId, UUID instanceId, UUID elementId) {
97         cleanExecution(elementId, messageId);
98         var result = executor.submit(() -> this.undeployProcess(instanceId, elementId));
99         executionMap.put(elementId, result);
100     }
101
102     private void undeployProcess(UUID instanceId, UUID elementId) {
103         try {
104             listener.undeploy(instanceId, elementId);
105         } catch (PfModelException e) {
106             LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage());
107             intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null,
108                     StateChangeResult.FAILED, "Automation composition element undeploy failed");
109         }
110         executionMap.remove(elementId);
111     }
112
113     /**
114      * Handle a automation composition element lock.
115      *
116      * @param messageId the messageId
117      * @param instanceId the automationComposition Id
118      * @param elementId the ID of the automation composition element
119      */
120     public void lock(UUID messageId, UUID instanceId, UUID elementId) {
121         cleanExecution(elementId, messageId);
122         var result = executor.submit(() -> this.lockProcess(instanceId, elementId));
123         executionMap.put(elementId, result);
124     }
125
126     private void lockProcess(UUID instanceId, UUID elementId) {
127         try {
128             listener.lock(instanceId, elementId);
129         } catch (PfModelException e) {
130             LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage());
131             intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
132                     StateChangeResult.FAILED, "Automation composition element lock failed");
133         }
134         executionMap.remove(elementId);
135     }
136
137     /**
138      * Handle a automation composition element unlock.
139      *
140      * @param messageId the messageId
141      * @param instanceId the automationComposition Id
142      * @param elementId the ID of the automation composition element
143      */
144     public void unlock(UUID messageId, UUID instanceId, UUID elementId) {
145         cleanExecution(elementId, messageId);
146         var result = executor.submit(() -> this.unlockProcess(instanceId, elementId));
147         executionMap.put(elementId, result);
148     }
149
150     private void unlockProcess(UUID instanceId, UUID elementId) {
151         try {
152             listener.unlock(instanceId, elementId);
153         } catch (PfModelException e) {
154             LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage());
155             intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
156                     StateChangeResult.FAILED, "Automation composition element unlock failed");
157         }
158         executionMap.remove(elementId);
159     }
160
161     /**
162      * Handle a automation composition element delete.
163      *
164      * @param messageId the messageId
165      * @param instanceId the automationComposition Id
166      * @param elementId the ID of the automation composition element
167      */
168     public void delete(UUID messageId, UUID instanceId, UUID elementId) {
169         cleanExecution(elementId, messageId);
170         var result = executor.submit(() -> this.deleteProcess(instanceId, elementId));
171         executionMap.put(elementId, result);
172     }
173
174     private void deleteProcess(UUID instanceId, UUID elementId) {
175         try {
176             listener.delete(instanceId, elementId);
177         } catch (PfModelException e) {
178             LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage());
179             intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null,
180                     StateChangeResult.FAILED, "Automation composition element delete failed");
181         }
182         executionMap.remove(elementId);
183     }
184
185     /**
186      * Handle a automation composition element properties update.
187      *
188      * @param messageId the messageId
189      * @param instanceId the automationComposition Id
190      * @param element the information on the automation composition element
191      * @param properties properties Map
192      */
193     public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
194         cleanExecution(element.getId(), messageId);
195         var result = executor.submit(() -> this.updateProcess(instanceId, element, properties));
196         executionMap.put(element.getId(), result);
197     }
198
199     private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
200         try {
201             listener.update(instanceId, element, properties);
202         } catch (PfModelException e) {
203             LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage());
204             intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
205                     null, StateChangeResult.FAILED, "Automation composition element update failed");
206         }
207         executionMap.remove(element.getId());
208     }
209
210     private void cleanExecution(UUID execIdentificationId, UUID messageId) {
211         var process = executionMap.get(execIdentificationId);
212         if (process != null) {
213             if (!process.isDone()) {
214                 process.cancel(true);
215             }
216             executionMap.remove(execIdentificationId);
217         }
218         cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
219     }
220
221     /**
222      * Handles prime a Composition Definition.
223      *
224      * @param messageId the messageId
225      * @param compositionId the compositionId
226      * @param list the list of AutomationCompositionElementDefinition
227      */
228     public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
229         cleanExecution(compositionId, messageId);
230         var result = executor.submit(() -> this.primeProcess(compositionId, list));
231         executionMap.put(compositionId, result);
232     }
233
234     private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
235         try {
236             listener.prime(compositionId, list);
237             executionMap.remove(compositionId);
238         } catch (PfModelException e) {
239             LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage());
240             intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED,
241                     "Composition Defintion prime failed");
242         }
243     }
244
245     /**
246      * Handles deprime a Composition Definition.
247      *
248      * @param messageId the messageId
249      * @param compositionId the compositionId
250      */
251     public void deprime(UUID messageId, UUID compositionId) {
252         cleanExecution(compositionId, messageId);
253         var result = executor.submit(() -> this.deprimeProcess(compositionId));
254         executionMap.put(compositionId, result);
255     }
256
257     private void deprimeProcess(UUID compositionId) {
258         try {
259             listener.deprime(compositionId);
260             executionMap.remove(compositionId);
261         } catch (PfModelException e) {
262             LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage());
263             intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
264                     "Composition Defintion deprime failed");
265         }
266     }
267
268     /**
269      * Handles restarted scenario.
270      *
271      * @param messageId the messageId
272      * @param compositionId the compositionId
273      * @param list the list of AutomationCompositionElementDefinition
274      * @param state the state of the composition
275      * @param automationCompositionList list of ParticipantRestartAc
276      */
277     public void restarted(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list,
278             AcTypeState state, List<ParticipantRestartAc> automationCompositionList) {
279         try {
280             listener.handleRestartComposition(compositionId, list, state);
281         } catch (PfModelException e) {
282             LOGGER.error("Composition Defintion restarted failed {} {}", compositionId, e.getMessage());
283             intermediaryApi.updateCompositionState(compositionId, state, StateChangeResult.FAILED,
284                     "Composition Defintion restarted failed");
285         }
286
287         for (var automationComposition : automationCompositionList) {
288             for (var element : automationComposition.getAcElementList()) {
289                 cleanExecution(element.getId(), messageId);
290                 var result = executor.submit(() -> this
291                         .restartedInstanceProcess(automationComposition.getAutomationCompositionId(), element));
292                 executionMap.put(element.getId(), result);
293             }
294         }
295     }
296
297     private void restartedInstanceProcess(UUID instanceId, AcElementRestart element) {
298         try {
299             var map = new HashMap<>(cacheProvider.getCommonProperties(instanceId, element.getId()));
300             map.putAll(element.getProperties());
301
302             listener.handleRestartInstance(instanceId, getAcElementDeploy(element), map, element.getDeployState(),
303                     element.getLockState());
304             executionMap.remove(element.getId());
305         } catch (PfModelException e) {
306             LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
307             intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
308                     element.getDeployState(), element.getLockState(), StateChangeResult.FAILED,
309                     "Automation composition element restart failed");
310         }
311     }
312
313     private AcElementDeploy getAcElementDeploy(AcElementRestart element) {
314         var acElementDeploy = new AcElementDeploy();
315         acElementDeploy.setId(element.getId());
316         acElementDeploy.setDefinition(element.getDefinition());
317         acElementDeploy.setProperties(element.getProperties());
318         acElementDeploy.setToscaServiceTemplateFragment(element.getToscaServiceTemplateFragment());
319         return acElementDeploy;
320     }
321
322     /**
323      * Closes this stream and releases any system resources associated
324      * with it. If the stream is already closed then invoking this
325      * method has no effect.
326      *
327      * @throws IOException if an I/O error occurs
328      */
329     @Override
330     public void close() throws IOException {
331         executor.shutdown();
332     }
333
334     /**
335      * Handles AutomationComposition Migration.
336      *
337      * @param messageId the messageId
338      * @param instanceId the automationComposition Id
339      * @param element the information on the automation composition element
340      * @param compositionTargetId the composition to migrate
341      */
342     public void migrate(UUID messageId, UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
343             Map<String, Object> properties) {
344         cleanExecution(element.getId(), messageId);
345         var result = executor.submit(() -> this.migrateProcess(instanceId, element, compositionTargetId, properties));
346         executionMap.put(element.getId(), result);
347     }
348
349     private void migrateProcess(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
350             Map<String, Object> properties) {
351         try {
352             listener.migrate(instanceId, element, compositionTargetId, properties);
353         } catch (PfModelException e) {
354             LOGGER.error("Automation composition element migrate failed {} {}", instanceId, e.getMessage());
355             intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
356                     null, StateChangeResult.FAILED, "Automation composition element migrate failed");
357         }
358         executionMap.remove(element.getId());
359     }
360 }