2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021,2024 Nordix Foundation.
4 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.clamp.acm.participant.intermediary.handler;
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.Timer;
28 import java.util.TimerTask;
30 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
31 import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
32 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
33 import org.onap.policy.common.message.bus.event.TopicEndpointManager;
34 import org.onap.policy.common.message.bus.event.TopicSink;
35 import org.onap.policy.common.message.bus.event.TopicSource;
36 import org.onap.policy.common.utils.services.ServiceManagerContainer;
37 import org.springframework.context.event.ContextClosedEvent;
38 import org.springframework.context.event.ContextRefreshedEvent;
39 import org.springframework.context.event.EventListener;
40 import org.springframework.stereotype.Component;
43 * This class activates the Participant Intermediary together with all its handlers.
46 public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
48 private static final String[] MSG_TYPE_NAMES = {"messageType"};
50 // Topics from which the participant receives and to which the participant sends messages
51 private final List<TopicSink> topicSinks;
52 private final List<TopicSource> topicSources;
54 private final ParticipantHandler participantHandler;
57 private final MessageTypeDispatcher msgDispatcher;
60 private final MessageTypeDispatcher syncMsgDispatcher;
63 * Instantiate the activator for participant.
65 * @param parameters the ParticipantParameters
66 * @param participantHandler the ParticipantHandler
67 * @param publishers list of Publishers
68 * @param listeners list of Listeners
70 public <T> IntermediaryActivator(final ParticipantParameters parameters, ParticipantHandler participantHandler,
71 List<Publisher> publishers, List<Listener<T>> listeners) {
72 this.participantHandler = participantHandler;
74 topicSinks = TopicEndpointManager.getManager().addTopicSinks(
75 parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSinks());
77 topicSources = TopicEndpointManager.getManager().addTopicSources(
78 parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSources());
80 msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
82 syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
85 addAction("Topic endpoint management",
86 () -> TopicEndpointManager.getManager().start(),
87 () -> TopicEndpointManager.getManager().shutdown());
89 listeners.stream().filter(Listener::isDefaultTopic)
90 .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
91 () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
92 () -> msgDispatcher.unregister(listener.getType())));
94 listeners.stream().filter(l -> ! l.isDefaultTopic())
95 .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
96 () -> syncMsgDispatcher.register(listener.getType(), listener.getScoListener()),
97 () -> syncMsgDispatcher.unregister(listener.getType())));
99 publishers.forEach(publisher ->
100 addAction("Publisher " + publisher.getClass().getSimpleName(),
101 () -> publisher.active(topicSinks),
104 var topics = parameters.getIntermediaryParameters().getTopics();
106 addAction("Topic Message Dispatcher", () -> this.registerMsgDispatcher(topics),
107 () -> this.unregisterMsgDispatcher(topics));
112 * Handle ContextRefreshEvent.
114 * @param ctxRefreshedEvent ContextRefreshedEvent
117 public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
120 var task = new TimerTask() {
123 new Thread(participantHandler::sendParticipantRegister).start();
126 new Timer().schedule(task, 5000);
131 * Handle ContextClosedEvent.
133 * @param ctxClosedEvent ContextClosedEvent
136 public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
138 sendParticipantDeregister();
143 private void sendParticipantDeregister() {
144 participantHandler.sendParticipantDeregister();
148 * Registers the dispatcher with the topic source(s).
150 private void registerMsgDispatcher(Topics topics) {
151 for (final var source : topicSources) {
152 if (source.getTopic().equals(topics.getOperationTopic())) {
153 source.register(msgDispatcher);
154 } else if (source.getTopic().equals(topics.getSyncTopic())) {
155 source.register(syncMsgDispatcher);
161 * Unregisters the dispatcher from the topic source(s).
163 private void unregisterMsgDispatcher(Topics topics) {
164 for (final var source : topicSources) {
165 if (source.getTopic().equals(topics.getOperationTopic())) {
166 source.unregister(msgDispatcher);
167 } else if (source.getTopic().equals(topics.getSyncTopic())) {
168 source.unregister(syncMsgDispatcher);
174 public void close() throws IOException {