88a472c29d969fe02920e07eb5ba6502899a390f
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP Policy Engine - Common Modules
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Factory of UEB Source Topics indexed by topic name.
37  */
38 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
39     private static final String MISSING_TOPIC = "A topic must be provided";
40
41     /**
42      * Logger.
43      */
44     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
45
46     /**
47      * UEB Topic Name Index.
48      */
49     protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
50
51     @Override
52     public UebTopicSource build(BusTopicParams busTopicParams) {
53         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
54             throw new IllegalArgumentException("UEB Server(s) must be provided");
55         }
56
57         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
58             throw new IllegalArgumentException(MISSING_TOPIC);
59         }
60
61         synchronized (this) {
62             if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
63                 return uebTopicSources.get(busTopicParams.getTopic());
64             }
65
66             UebTopicSource uebTopicSource = makeSource(busTopicParams);
67
68             if (busTopicParams.isManaged()) {
69                 uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
70             }
71
72             return uebTopicSource;
73         }
74     }
75
76     @Override
77     public List<UebTopicSource> build(Properties properties) {
78
79         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
80         if (readTopics == null || readTopics.isEmpty()) {
81             logger.info("{}: no topic for UEB Source", this);
82             return new ArrayList<>();
83         }
84         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
85
86         List<UebTopicSource> newUebTopicSources = new ArrayList<>();
87         synchronized (this) {
88             for (String topic : readTopicList) {
89                 if (this.uebTopicSources.containsKey(topic)) {
90                     newUebTopicSources.add(this.uebTopicSources.get(topic));
91                     continue;
92                 }
93
94                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
95                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
96
97                 if (servers == null || servers.isEmpty()) {
98                     logger.error("{}: no UEB servers configured for sink {}", this, topic);
99                     continue;
100                 }
101
102                 final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
103
104                 final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
105                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
106
107                 final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
108                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
109
110                 final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS 
111                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
112
113                 final String consumerInstance = properties.getProperty(
114                         PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
115                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
116
117                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
118                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
119                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
120                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
121                     try {
122                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
123                     } catch (NumberFormatException nfe) {
124                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
125                                 topic);
126                     }
127                 }
128
129                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
130                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
131                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
132                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
133                     try {
134                         fetchLimit = Integer.parseInt(fetchLimitString);
135                     } catch (NumberFormatException nfe) {
136                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
137                                 topic);
138                     }
139                 }
140
141                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
142                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
143                 boolean managed = true;
144                 if (managedString != null && !managedString.isEmpty()) {
145                     managed = Boolean.parseBoolean(managedString);
146                 }
147
148                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
149                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
150
151                 // default is to use HTTP if no https property exists
152                 boolean useHttps = false;
153                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
154                     useHttps = Boolean.parseBoolean(useHttpsString);
155                 }
156
157                 String allowSelfSignedCertsString =
158                         properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
159                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
160
161                 // default is to disallow self-signed certs
162                 boolean allowSelfSignedCerts = false;
163                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
164                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
165                 }
166
167                 UebTopicSource uebTopicSource = this.build(BusTopicParams.builder()
168                         .servers(serverList)
169                         .topic(topic)
170                         .apiKey(apiKey)
171                         .apiSecret(apiSecret)
172                         .consumerGroup(consumerGroup)
173                         .consumerInstance(consumerInstance)
174                         .fetchTimeout(fetchTimeout)
175                         .fetchLimit(fetchLimit)
176                         .managed(managed)
177                         .useHttps(useHttps)
178                         .allowSelfSignedCerts(allowSelfSignedCerts).build());
179                 newUebTopicSources.add(uebTopicSource);
180             }
181         }
182         return newUebTopicSources;
183     }
184
185     @Override
186     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
187
188         return this.build(BusTopicParams.builder()
189                 .servers(servers)
190                 .topic(topic)
191                 .apiKey(apiKey)
192                 .apiSecret(apiSecret)
193                 .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
194                 .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH)
195                 .managed(true)
196                 .useHttps(false)
197                 .allowSelfSignedCerts(true).build());
198     }
199
200     @Override
201     public UebTopicSource build(List<String> servers, String topic) {
202         return this.build(servers, topic, null, null);
203     }
204
205     /**
206      * Makes a new source.
207      * 
208      * @param busTopicParams parameters to use to configure the source
209      * @return a new source
210      */
211     protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
212         return new SingleThreadedUebTopicSource(busTopicParams);
213     }
214
215     @Override
216     public void destroy(String topic) {
217
218         if (topic == null || topic.isEmpty()) {
219             throw new IllegalArgumentException(MISSING_TOPIC);
220         }
221
222         UebTopicSource uebTopicSource;
223
224         synchronized (this) {
225             if (!uebTopicSources.containsKey(topic)) {
226                 return;
227             }
228
229             uebTopicSource = uebTopicSources.remove(topic);
230         }
231
232         uebTopicSource.shutdown();
233     }
234
235     @Override
236     public void destroy() {
237         List<UebTopicSource> readers = this.inventory();
238         for (UebTopicSource reader : readers) {
239             reader.shutdown();
240         }
241
242         synchronized (this) {
243             this.uebTopicSources.clear();
244         }
245     }
246
247     @Override
248     public UebTopicSource get(String topic) {
249
250         if (topic == null || topic.isEmpty()) {
251             throw new IllegalArgumentException(MISSING_TOPIC);
252         }
253
254         synchronized (this) {
255             if (uebTopicSources.containsKey(topic)) {
256                 return uebTopicSources.get(topic);
257             } else {
258                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
259             }
260         }
261     }
262
263     @Override
264     public synchronized List<UebTopicSource> inventory() {
265         return new ArrayList<>(this.uebTopicSources.values());
266     }
267
268     @Override
269     public String toString() {
270         StringBuilder builder = new StringBuilder();
271         builder.append("IndexedUebTopicSourceFactory []");
272         return builder.toString();
273     }
274 }