d196dd193d23c11cf6151ed0209a6e5d95531a9c
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2021 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.controlloop.runtime.config.messaging;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.util.List;
26 import lombok.Getter;
27 import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
28 import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
29 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
32 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
33 import org.onap.policy.common.utils.services.ServiceManagerContainer;
34 import org.springframework.context.event.ContextClosedEvent;
35 import org.springframework.context.event.ContextRefreshedEvent;
36 import org.springframework.context.event.EventListener;
37 import org.springframework.stereotype.Component;
38
39 @Component
40 public class MessageDispatcherActivator extends ServiceManagerContainer implements Closeable {
41
42     private static final String[] MSG_TYPE_NAMES = {"messageType"};
43
44     // Topics from which the application receives and to which the application sends messages
45     private List<TopicSink> topicSinks;
46     private List<TopicSource> topicSources;
47
48     @Getter
49     private final MessageTypeDispatcher msgDispatcher;
50
51     /**
52      * Constructor.
53      *
54      * @param clRuntimeParameterGroup the parameters for the control loop runtime service
55      * @param publishers list of Publishers
56      * @param listeners list of Listeners
57      * @throws ControlLoopRuntimeException if the activator does not start
58      */
59     public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, List<Publisher> publishers,
60             List<Listener> listeners) {
61         topicSinks = TopicEndpointManager.getManager()
62                 .addTopicSinks(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSinks());
63
64         topicSources = TopicEndpointManager.getManager()
65                 .addTopicSources(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
66
67         msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
68
69         // @formatter:off
70         addAction("Topic endpoint management",
71                 () -> TopicEndpointManager.getManager().start(),
72                 () -> TopicEndpointManager.getManager().shutdown());
73
74         publishers.forEach(publisher ->
75             addAction("Publisher " + publisher.getClass().getSimpleName(),
76                 () -> publisher.active(topicSinks),
77                 publisher::stop));
78
79         listeners.forEach(listener ->
80             addAction("Listener " + listener.getClass().getSimpleName(),
81                     () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
82                     () -> msgDispatcher.unregister(listener.getType())));
83
84         addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
85         // @formatter:on
86     }
87
88     /**
89      * Registers the dispatcher with the topic source(s).
90      */
91     private void registerMsgDispatcher() {
92         for (final TopicSource source : topicSources) {
93             source.register(msgDispatcher);
94         }
95     }
96
97     /**
98      * Unregisters the dispatcher from the topic source(s).
99      */
100     private void unregisterMsgDispatcher() {
101         for (final TopicSource source : topicSources) {
102             source.unregister(msgDispatcher);
103         }
104     }
105
106     /**
107      * Start Manager after the application is Started.
108      *
109      * @param cre Refreshed Event
110      */
111     @EventListener
112     public void handleContextStart(ContextRefreshedEvent cre) {
113         if (!isAlive()) {
114             start();
115         }
116     }
117
118     /**
119      * Handle ContextClosedEvent.
120      *
121      * @param ctxClosedEvent ContextClosedEvent
122      */
123     @EventListener
124     public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
125         if (isAlive()) {
126             stop();
127         }
128     }
129
130     @Override
131     public void close() throws IOException {
132         if (isAlive()) {
133             super.shutdown();
134         }
135     }
136 }