1ff4aabdc72eb2f574b0057d37b20e124fe300dd
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
22
23 import io.opentelemetry.context.Context;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import lombok.RequiredArgsConstructor;
27 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
28 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg;
29 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
30 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.springframework.stereotype.Component;
34
35 @Component
36 @RequiredArgsConstructor
37 public class MsgExecutor {
38
39     private final ExecutorService executor = Context.taskWrapping(Executors.newSingleThreadExecutor());
40     private static final Logger LOGGER = LoggerFactory.getLogger(MsgExecutor.class);
41
42     private final CacheProvider cacheProvider;
43     private final ParticipantMessagePublisher publisher;
44
45     /**
46      * Execute the message if all data are present or put on Hold if something is missing.
47      *
48      * @param message the message
49      */
50     public void execute(AutomationCompositionMsg<?> message) {
51         if (validExecution(message)) {
52             message.execute();
53         } else {
54             cacheProvider.getMessagesOnHold().put(message.getKey(), message);
55             var participantReqSync = new ParticipantReqSync();
56             participantReqSync.setParticipantId(cacheProvider.getParticipantId());
57             participantReqSync.setReplicaId(cacheProvider.getReplicaId());
58             participantReqSync.setCompositionId(message.getCompositionId());
59             participantReqSync.setAutomationCompositionId(message.getInstanceId());
60             participantReqSync.setCompositionTargetId(message.getCompositionTargetId());
61             publisher.sendParticipantReqSync(participantReqSync);
62         }
63     }
64
65     /**
66      * Check if messages on hold can be executed.
67      */
68     public void check() {
69         executor.submit(this::checkAndExecute);
70     }
71
72     private void checkAndExecute() {
73         var executable = cacheProvider.getMessagesOnHold().values().stream()
74                 .filter(this::validExecution).toList();
75         executable.forEach(AutomationCompositionMsg::execute);
76         executable.forEach(msg -> cacheProvider.getMessagesOnHold().remove(msg.getKey()));
77     }
78
79     private boolean validExecution(AutomationCompositionMsg<?> message) {
80         var result = true;
81         if (message.getCompositionId() != null) {
82             var valid = cacheProvider.isCompositionDefinitionUpdated(message.getCompositionId(),
83                     message.getRevisionIdComposition());
84             if (valid) {
85                 message.setCompositionId(null);
86                 message.setRevisionIdComposition(null);
87             } else {
88                 LOGGER.debug("Composition {} missing or outdated", message.getCompositionId());
89                 result = false;
90             }
91         }
92         if (message.getCompositionTargetId() != null) {
93             var valid = cacheProvider.isCompositionDefinitionUpdated(message.getCompositionTargetId(),
94                     message.getRevisionIdCompositionTarget());
95             if (valid) {
96                 message.setCompositionTargetId(null);
97                 message.setRevisionIdCompositionTarget(null);
98             } else {
99                 LOGGER.debug("Composition Target {} missing or outdated", message.getCompositionTargetId());
100                 result = false;
101             }
102         }
103         if (message.getInstanceId() != null) {
104             var valid = cacheProvider.isInstanceUpdated(message.getInstanceId(), message.getRevisionIdInstance());
105             if (valid) {
106                 message.setInstanceId(null);
107                 message.setRevisionIdInstance(null);
108             } else {
109                 LOGGER.debug("Instance {} missing or outdated", message.getInstanceId());
110                 result = false;
111             }
112         }
113         return result;
114     }
115 }