f3ef8fdc5c61a59572bba771cc4ec829ae9b6f25
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Factory of UEB Source Topics indexed by topic name.
37  */
38 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
39     private static final String MISSING_TOPIC = "A topic must be provided";
40
41     /**
42      * Logger.
43      */
44     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
45
46     /**
47      * UEB Topic Name Index.
48      */
49     protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
50
51     @Override
52     public UebTopicSource build(BusTopicParams busTopicParams) {
53         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
54             throw new IllegalArgumentException("UEB Server(s) must be provided");
55         }
56
57         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
58             throw new IllegalArgumentException(MISSING_TOPIC);
59         }
60
61         synchronized (this) {
62             if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
63                 return uebTopicSources.get(busTopicParams.getTopic());
64             }
65
66             UebTopicSource uebTopicSource = makeSource(busTopicParams);
67
68             if (busTopicParams.isManaged()) {
69                 uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
70             }
71
72             return uebTopicSource;
73         }
74     }
75
76     @Override
77     public List<UebTopicSource> build(Properties properties) {
78
79         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
80         if (readTopics == null || readTopics.isEmpty()) {
81             logger.info("{}: no topic for UEB Source", this);
82             return new ArrayList<>();
83         }
84         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
85
86         List<UebTopicSource> newUebTopicSources = new ArrayList<>();
87         synchronized (this) {
88             for (String topic : readTopicList) {
89                 if (this.uebTopicSources.containsKey(topic)) {
90                     newUebTopicSources.add(this.uebTopicSources.get(topic));
91                     continue;
92                 }
93
94                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
95                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
96
97                 if (servers == null || servers.isEmpty()) {
98                     logger.error("{}: no UEB servers configured for sink {}", this, topic);
99                     continue;
100                 }
101
102                 final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
103
104                 final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
105                     + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic);
106
107                 final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
108                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
109
110                 final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
111                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
112
113                 final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
114                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
115
116                 final String consumerInstance = properties.getProperty(
117                         PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
118                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
119
120                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
121                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
122                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
123                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
124                     try {
125                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
126                     } catch (NumberFormatException nfe) {
127                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
128                                 topic);
129                     }
130                 }
131
132                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
133                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
134                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
135                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
136                     try {
137                         fetchLimit = Integer.parseInt(fetchLimitString);
138                     } catch (NumberFormatException nfe) {
139                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
140                                 topic);
141                     }
142                 }
143
144                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
145                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
146                 boolean managed = true;
147                 if (managedString != null && !managedString.isEmpty()) {
148                     managed = Boolean.parseBoolean(managedString);
149                 }
150
151                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
152                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
153
154                 // default is to use HTTP if no https property exists
155                 boolean useHttps = false;
156                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
157                     useHttps = Boolean.parseBoolean(useHttpsString);
158                 }
159
160                 String allowSelfSignedCertsString =
161                         properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
162                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
163
164                 // default is to disallow self-signed certs
165                 boolean allowSelfSignedCerts = false;
166                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
167                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
168                 }
169
170                 UebTopicSource uebTopicSource = this.build(BusTopicParams.builder()
171                         .servers(serverList)
172                         .topic(topic)
173                         .effectiveTopic(effectiveTopic)
174                         .apiKey(apiKey)
175                         .apiSecret(apiSecret)
176                         .consumerGroup(consumerGroup)
177                         .consumerInstance(consumerInstance)
178                         .fetchTimeout(fetchTimeout)
179                         .fetchLimit(fetchLimit)
180                         .managed(managed)
181                         .useHttps(useHttps)
182                         .allowSelfSignedCerts(allowSelfSignedCerts).build());
183                 newUebTopicSources.add(uebTopicSource);
184             }
185         }
186         return newUebTopicSources;
187     }
188
189     @Override
190     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
191
192         return this.build(BusTopicParams.builder()
193                 .servers(servers)
194                 .topic(topic)
195                 .apiKey(apiKey)
196                 .apiSecret(apiSecret)
197                 .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
198                 .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH)
199                 .managed(true)
200                 .useHttps(false)
201                 .allowSelfSignedCerts(true).build());
202     }
203
204     @Override
205     public UebTopicSource build(List<String> servers, String topic) {
206         return this.build(servers, topic, null, null);
207     }
208
209     /**
210      * Makes a new source.
211      * 
212      * @param busTopicParams parameters to use to configure the source
213      * @return a new source
214      */
215     protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
216         return new SingleThreadedUebTopicSource(busTopicParams);
217     }
218
219     @Override
220     public void destroy(String topic) {
221
222         if (topic == null || topic.isEmpty()) {
223             throw new IllegalArgumentException(MISSING_TOPIC);
224         }
225
226         UebTopicSource uebTopicSource;
227
228         synchronized (this) {
229             if (!uebTopicSources.containsKey(topic)) {
230                 return;
231             }
232
233             uebTopicSource = uebTopicSources.remove(topic);
234         }
235
236         uebTopicSource.shutdown();
237     }
238
239     @Override
240     public void destroy() {
241         List<UebTopicSource> readers = this.inventory();
242         for (UebTopicSource reader : readers) {
243             reader.shutdown();
244         }
245
246         synchronized (this) {
247             this.uebTopicSources.clear();
248         }
249     }
250
251     @Override
252     public UebTopicSource get(String topic) {
253
254         if (topic == null || topic.isEmpty()) {
255             throw new IllegalArgumentException(MISSING_TOPIC);
256         }
257
258         synchronized (this) {
259             if (uebTopicSources.containsKey(topic)) {
260                 return uebTopicSources.get(topic);
261             } else {
262                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
263             }
264         }
265     }
266
267     @Override
268     public synchronized List<UebTopicSource> inventory() {
269         return new ArrayList<>(this.uebTopicSources.values());
270     }
271
272     @Override
273     public String toString() {
274         StringBuilder builder = new StringBuilder();
275         builder.append("IndexedUebTopicSourceFactory []");
276         return builder.toString();
277     }
278 }