2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2025 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
27 import org.onap.policy.common.message.bus.event.Topic;
28 import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
29 import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheckFactory;
30 import org.onap.policy.common.parameters.topic.TopicParameters;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.springframework.context.event.ContextClosedEvent;
34 import org.springframework.context.event.ContextRefreshedEvent;
35 import org.springframework.context.event.EventListener;
36 import org.springframework.stereotype.Component;
39 public class BrokerStarter<T> {
40 private static final Logger LOGGER = LoggerFactory.getLogger(BrokerStarter.class);
41 private final IntermediaryActivator activator;
42 private final ParticipantHandler participantHandler;
43 private final TopicHealthCheck topicHealthCheck;
45 private final ParticipantParameters parameters;
46 private final List<Publisher> publishers;
47 private final List<Listener<T>> listeners;
52 * @param parameters ParticipantParameters
53 * @param activator IntermediaryActivator
54 * @param participantHandler participantHandler
56 public BrokerStarter(ParticipantParameters parameters,
57 List<Publisher> publishers, List<Listener<T>> listeners, IntermediaryActivator activator,
58 ParticipantHandler participantHandler) {
59 this.parameters = parameters;
60 this.listeners = listeners;
61 this.publishers = publishers;
62 this.activator = activator;
63 this.participantHandler = participantHandler;
64 var topic = parameters.getIntermediaryParameters().getClampAdminTopics();
66 topic = new TopicParameters();
67 topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
69 this.topicHealthCheck = createTopicHealthCheck(topic);
72 protected TopicHealthCheck createTopicHealthCheck(TopicParameters topic) {
73 return new TopicHealthCheckFactory().getTopicHealthCheck(topic);
77 * Handle ContextRefreshEvent.
79 * @param ctxRefreshedEvent ContextRefreshedEvent
82 public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
83 if (!activator.isAlive()) {
84 runTopicHealthCheck();
89 private void runTopicHealthCheck() {
90 var fetchTimeout = getFetchTimeout();
91 while (!topicHealthCheck.healthCheck(getTopics())) {
92 LOGGER.debug(" Broker not up yet!");
94 Thread.sleep(fetchTimeout);
95 } catch (InterruptedException e) {
96 LOGGER.error(e.getMessage());
97 Thread.currentThread().interrupt();
102 private List<String> getTopics() {
103 var opTopic = parameters.getIntermediaryParameters().getTopics().getOperationTopic();
104 var syncTopic = parameters.getIntermediaryParameters().getTopics().getSyncTopic();
105 return Boolean.TRUE.equals(parameters.getIntermediaryParameters().getTopicValidation())
106 ? List.of(opTopic, syncTopic) : List.<String>of();
109 private int getFetchTimeout() {
110 int fetchTimeout = parameters.getIntermediaryParameters().getClampAdminTopics() == null
111 ? 0 : parameters.getIntermediaryParameters().getClampAdminTopics().getFetchTimeout();
112 return Math.max(fetchTimeout, 5000);
115 private void start() {
116 activator.config(parameters, publishers, listeners);
118 var task = new TimerTask() {
121 new Thread(participantHandler::sendParticipantRegister).start();
124 new Timer().schedule(task, 5000);
129 * Handle ContextClosedEvent.
131 * @param ctxClosedEvent ContextClosedEvent
134 public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
135 if (activator.isAlive()) {
136 participantHandler.sendParticipantDeregister();