61f05db510eebb8d509d9a8adda139372c344d15
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2022,2024 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.element.handler;
22
23 import java.io.IOException;
24 import java.util.List;
25 import org.onap.policy.clamp.acm.element.handler.messages.ElementMessageType;
26 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
27 import org.onap.policy.common.endpoints.event.comm.TopicSource;
28 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
29 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
30 import org.onap.policy.common.utils.services.ServiceManagerContainer;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.springframework.context.event.ContextClosedEvent;
34 import org.springframework.context.event.EventListener;
35 import org.springframework.stereotype.Component;
36
37 /**
38  * This class activates the Kafka together with all its handlers.
39  */
40 @Component
41 public class MessageActivator extends ServiceManagerContainer implements AutoCloseable {
42
43     private static final Logger LOGGER = LoggerFactory.getLogger(MessageActivator.class);
44
45     private static final String[] MSG_TYPE_NAMES = { "messageType" };
46
47     // Topics from which the AC element receives and sends messages
48     private List<TopicSource> topicSources;
49
50     private final MessageListener listener;
51     private final MessagePublisher publisher;
52
53     private final MessageTypeDispatcher msgDispatcher;
54
55     /**
56      * Constructor.
57      *
58      * @param listener  MessageListener
59      * @param publisher MessagePublisher
60      */
61     public MessageActivator(MessageListener listener, MessagePublisher publisher) {
62         super();
63         this.listener = listener;
64         this.publisher = publisher;
65         msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
66     }
67
68     /**
69      * Activate publisher and listener messages.
70      *
71      * @param parameters TopicParameterGroup
72      */
73     public void activate(final TopicParameterGroup parameters) {
74         var topicSinks = TopicEndpointManager.getManager().addTopicSinks(parameters.getTopicSinks());
75         topicSources = TopicEndpointManager.getManager().addTopicSources(parameters.getTopicSources());
76
77         // @formatter:off
78         addAction("Topic endpoint management",
79             () -> TopicEndpointManager.getManager().start(),
80             () -> TopicEndpointManager.getManager().shutdown());
81
82         addAction("Message Publisher",
83             () -> publisher.active(topicSinks), publisher::stop);
84
85
86         addAction("Message Listener",
87             () -> msgDispatcher.register(ElementMessageType.STATUS.name(), listener),
88             () -> msgDispatcher.unregister(ElementMessageType.STATUS.name()));
89
90         addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
91         // @formatter:on
92
93         start();
94         LOGGER.info("Kafka configuration initialised successfully");
95     }
96
97     /**
98      * Handle ContextClosedEvent.
99      *
100      * @param ctxClosedEvent ContextClosedEvent
101      */
102     @EventListener
103     public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
104         deactivate();
105     }
106
107     /**
108      * Deactivate publisher and listener messages.
109      */
110     public void deactivate() {
111         if (isAlive()) {
112             stop();
113         }
114     }
115
116     /**
117      * Registers the dispatcher with the topic source(s).
118      */
119     private void registerMsgDispatcher() {
120         for (final TopicSource source : topicSources) {
121             source.register(msgDispatcher);
122         }
123     }
124
125     /**
126      * Unregisters the dispatcher from the topic source(s).
127      */
128     private void unregisterMsgDispatcher() {
129         for (final TopicSource source : topicSources) {
130             source.unregister(msgDispatcher);
131         }
132     }
133
134     @Override
135     public void close() throws IOException {
136         if (isAlive()) {
137             super.shutdown();
138             LOGGER.info("Kafka configuration is uninitialised.");
139         }
140     }
141 }