2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import io.opentelemetry.context.Context;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import lombok.RequiredArgsConstructor;
27 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
28 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg;
29 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
30 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.springframework.stereotype.Component;
36 @RequiredArgsConstructor
37 public class MsgExecutor {
39 private final ExecutorService executor = Context.taskWrapping(Executors.newSingleThreadExecutor());
40 private static final Logger LOGGER = LoggerFactory.getLogger(MsgExecutor.class);
42 private final CacheProvider cacheProvider;
43 private final ParticipantMessagePublisher publisher;
46 * Execute the message if all data are present or put on Hold if something is missing.
48 * @param message the message
50 public void execute(AutomationCompositionMsg<?> message) {
51 if (validExecution(message)) {
54 cacheProvider.getMessagesOnHold().put(message.getKey(), message);
55 var participantReqSync = new ParticipantReqSync();
56 participantReqSync.setParticipantId(cacheProvider.getParticipantId());
57 participantReqSync.setReplicaId(cacheProvider.getReplicaId());
58 participantReqSync.setCompositionId(message.getCompositionId());
59 participantReqSync.setAutomationCompositionId(message.getInstanceId());
60 participantReqSync.setCompositionTargetId(message.getCompositionTargetId());
61 publisher.sendParticipantReqSync(participantReqSync);
66 * Check if messages on hold can be executed.
69 executor.submit(this::checkAndExecute);
72 private void checkAndExecute() {
73 var executable = cacheProvider.getMessagesOnHold().values().stream()
74 .filter(this::validExecution).toList();
75 executable.forEach(AutomationCompositionMsg::execute);
76 executable.forEach(msg -> cacheProvider.getMessagesOnHold().remove(msg.getKey()));
79 private boolean validExecution(AutomationCompositionMsg<?> message) {
81 if (message.getCompositionId() != null) {
82 var valid = cacheProvider.isCompositionDefinitionUpdated(message.getCompositionId(),
83 message.getRevisionIdComposition());
85 message.setCompositionId(null);
86 message.setRevisionIdComposition(null);
88 LOGGER.debug("Composition {} missing or outdated", message.getCompositionId());
92 if (message.getCompositionTargetId() != null) {
93 var valid = cacheProvider.isCompositionDefinitionUpdated(message.getCompositionTargetId(),
94 message.getRevisionIdCompositionTarget());
96 message.setCompositionTargetId(null);
97 message.setRevisionIdCompositionTarget(null);
99 LOGGER.debug("Composition Target {} missing or outdated", message.getCompositionTargetId());
103 if (message.getInstanceId() != null) {
104 var valid = cacheProvider.isInstanceUpdated(message.getInstanceId(), message.getRevisionIdInstance());
106 message.setInstanceId(null);
107 message.setRevisionIdInstance(null);
109 LOGGER.debug("Instance {} missing or outdated", message.getInstanceId());