367d4ab9044738a5b53bc0ded95e4ec0c5b62318
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.regex.Pattern;
28 import org.apache.commons.lang3.StringUtils;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.PropertyUtils;
33 import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Factory of UEB Source Topics indexed by topic name.
39  */
40 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
41     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
42     private static final String MISSING_TOPIC = "A topic must be provided";
43
44     /**
45      * Logger.
46      */
47     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
48
49     /**
50      * UEB Topic Name Index.
51      */
52     protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
53
54     @Override
55     public UebTopicSource build(BusTopicParams busTopicParams) {
56         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
57             throw new IllegalArgumentException("UEB Server(s) must be provided");
58         }
59
60         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
61             throw new IllegalArgumentException(MISSING_TOPIC);
62         }
63
64         synchronized (this) {
65             if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
66                 return uebTopicSources.get(busTopicParams.getTopic());
67             }
68
69             var uebTopicSource = makeSource(busTopicParams);
70
71             if (busTopicParams.isManaged()) {
72                 uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
73             }
74
75             return uebTopicSource;
76         }
77     }
78
79     @Override
80     public List<UebTopicSource> build(Properties properties) {
81
82         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
83         if (StringUtils.isBlank(readTopics)) {
84             logger.info("{}: no topic for UEB Source", this);
85             return new ArrayList<>();
86         }
87
88         List<UebTopicSource> newUebTopicSources = new ArrayList<>();
89         synchronized (this) {
90             for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
91                 addTopic(newUebTopicSources, topic, properties);
92             }
93         }
94         return newUebTopicSources;
95     }
96
97     @Override
98     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
99
100         return this.build(BusTopicParams.builder()
101                 .servers(servers)
102                 .topic(topic)
103                 .apiKey(apiKey)
104                 .apiSecret(apiSecret)
105                 .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
106                 .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
107                 .managed(true)
108                 .useHttps(false)
109                 .allowSelfSignedCerts(true).build());
110     }
111
112     @Override
113     public UebTopicSource build(List<String> servers, String topic) {
114         return this.build(servers, topic, null, null);
115     }
116
117     private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) {
118         if (this.uebTopicSources.containsKey(topic)) {
119             newUebTopicSources.add(this.uebTopicSources.get(topic));
120             return;
121         }
122
123         String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic;
124
125         var props = new PropertyUtils(properties, topicPrefix,
126             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
127
128         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
129         if (StringUtils.isBlank(servers)) {
130             logger.error("{}: no UEB servers configured for sink {}", this, topic);
131             return;
132         }
133
134         var uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
135                 .consumerGroup(props.getString(
136                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
137                 .consumerInstance(props.getString(
138                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
139                 .fetchTimeout(props.getInteger(
140                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
141                                 PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
142                 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
143                                 PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
144                 .build());
145
146         newUebTopicSources.add(uebTopicSource);
147     }
148
149     /**
150      * Makes a new source.
151      *
152      * @param busTopicParams parameters to use to configure the source
153      * @return a new source
154      */
155     protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
156         return new SingleThreadedUebTopicSource(busTopicParams);
157     }
158
159     @Override
160     public void destroy(String topic) {
161
162         if (topic == null || topic.isEmpty()) {
163             throw new IllegalArgumentException(MISSING_TOPIC);
164         }
165
166         UebTopicSource uebTopicSource;
167
168         synchronized (this) {
169             if (!uebTopicSources.containsKey(topic)) {
170                 return;
171             }
172
173             uebTopicSource = uebTopicSources.remove(topic);
174         }
175
176         uebTopicSource.shutdown();
177     }
178
179     @Override
180     public void destroy() {
181         List<UebTopicSource> readers = this.inventory();
182         for (UebTopicSource reader : readers) {
183             reader.shutdown();
184         }
185
186         synchronized (this) {
187             this.uebTopicSources.clear();
188         }
189     }
190
191     @Override
192     public UebTopicSource get(String topic) {
193
194         if (topic == null || topic.isEmpty()) {
195             throw new IllegalArgumentException(MISSING_TOPIC);
196         }
197
198         synchronized (this) {
199             if (uebTopicSources.containsKey(topic)) {
200                 return uebTopicSources.get(topic);
201             } else {
202                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
203             }
204         }
205     }
206
207     @Override
208     public synchronized List<UebTopicSource> inventory() {
209         return new ArrayList<>(this.uebTopicSources.values());
210     }
211
212     @Override
213     public String toString() {
214         return "IndexedUebTopicSourceFactory []";
215     }
216 }