f913926e0c8a29d19b60c8864e732974f22bbb43
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2022-2023 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.common.endpoints.event.comm.bus;
20
21 import com.google.re2j.Pattern;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Properties;
26 import org.apache.commons.lang3.StringUtils;
27 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineKafkaTopicSink;
29 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
30 import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils;
31 import org.onap.policy.common.endpoints.utils.PropertyUtils;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Factory of KAFKA Reader Topics indexed by topic name.
37  */
38 class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
39     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
40     private static final String MISSING_TOPIC = "A topic must be provided";
41
42     /**
43      * Logger.
44      */
45     private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
46
47     /**
48      * KAFKA Topic Name Index.
49      */
50     protected HashMap<String, KafkaTopicSink> kafkaTopicSinks = new HashMap<>();
51
52     @Override
53     public KafkaTopicSink build(BusTopicParams busTopicParams) {
54
55         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
56             throw new IllegalArgumentException("KAFKA Server(s) must be provided");
57         }
58
59         if (StringUtils.isBlank(busTopicParams.getTopic())) {
60             throw new IllegalArgumentException(MISSING_TOPIC);
61         }
62
63         synchronized (this) {
64             if (kafkaTopicSinks.containsKey(busTopicParams.getTopic())) {
65                 return kafkaTopicSinks.get(busTopicParams.getTopic());
66             }
67
68             KafkaTopicSink kafkaTopicWriter = makeSink(busTopicParams);
69             if (busTopicParams.isManaged()) {
70                 kafkaTopicSinks.put(busTopicParams.getTopic(), kafkaTopicWriter);
71             }
72
73             return kafkaTopicWriter;
74         }
75     }
76
77
78     @Override
79     public KafkaTopicSink build(List<String> servers, String topic) {
80         return this.build(BusTopicParams.builder()
81                 .servers(servers)
82                 .topic(topic)
83                 .managed(true)
84                 .useHttps(false)
85                 .build());
86     }
87
88
89     @Override
90     public List<KafkaTopicSink> build(Properties properties) {
91
92         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS);
93         if (StringUtils.isBlank(writeTopics)) {
94             logger.info("{}: no topic for KAFKA Sink", this);
95             return new ArrayList<>();
96         }
97
98         List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>();
99         synchronized (this) {
100             for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
101                 addTopic(newKafkaTopicSinks, topic.toLowerCase(), properties);
102             }
103             return newKafkaTopicSinks;
104         }
105     }
106
107     private void addTopic(List<KafkaTopicSink> newKafkaTopicSinks, String topic, Properties properties) {
108         if (this.kafkaTopicSinks.containsKey(topic)) {
109             newKafkaTopicSinks.add(this.kafkaTopicSinks.get(topic));
110             return;
111         }
112
113         String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic;
114
115         var props = new PropertyUtils(properties, topicPrefix,
116             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic sink {} ",
117                 this, name, value, topic));
118
119         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
120         if (StringUtils.isBlank(servers)) {
121             logger.error("{}: no KAFKA servers configured for sink {}", this, topic);
122             return;
123         }
124
125         KafkaTopicSink kafkaTopicWriter = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers)
126                 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
127                 .build());
128         newKafkaTopicSinks.add(kafkaTopicWriter);
129     }
130
131     @Override
132     public void destroy(String topic) {
133
134         if (topic == null || topic.isEmpty()) {
135             throw new IllegalArgumentException(MISSING_TOPIC);
136         }
137
138         KafkaTopicSink kafkaTopicWriter;
139         synchronized (this) {
140             if (!kafkaTopicSinks.containsKey(topic)) {
141                 return;
142             }
143
144             kafkaTopicWriter = kafkaTopicSinks.remove(topic);
145         }
146
147         kafkaTopicWriter.shutdown();
148     }
149
150     @Override
151     public void destroy() {
152         List<KafkaTopicSink> writers = this.inventory();
153         for (KafkaTopicSink writer : writers) {
154             writer.shutdown();
155         }
156
157         synchronized (this) {
158             this.kafkaTopicSinks.clear();
159         }
160     }
161
162     @Override
163     public KafkaTopicSink get(String topic) {
164
165         if (topic == null || topic.isEmpty()) {
166             throw new IllegalArgumentException(MISSING_TOPIC);
167         }
168
169         synchronized (this) {
170             if (kafkaTopicSinks.containsKey(topic)) {
171                 return kafkaTopicSinks.get(topic);
172             } else {
173                 throw new IllegalStateException("KafkaTopicSink for " + topic + " not found");
174             }
175         }
176     }
177
178     @Override
179     public synchronized List<KafkaTopicSink> inventory() {
180         return new ArrayList<>(this.kafkaTopicSinks.values());
181     }
182
183     /**
184      * Makes a new sink.
185      *
186      * @param busTopicParams parameters to use to configure the sink
187      * @return a new sink
188      */
189     protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
190         return new InlineKafkaTopicSink(busTopicParams);
191     }
192
193
194     @Override
195     public String toString() {
196         return "IndexedKafkaTopicSinkFactory " + kafkaTopicSinks.keySet();
197     }
198
199 }