9d1bd8ad0fc5ceb74f5b86daef58cb51403e18f6
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * UEB Topic Sink Factory
38  */
39 public interface UebTopicSinkFactory {
40
41     /**
42      * Instantiates a new UEB Topic Writer
43      * 
44      * @param servers list of servers
45      * @param topic topic name
46      * @param apiKey API Key
47      * @param apiSecret API Secret
48      * @param partitionKey Consumer Group
49      * @param managed is this sink endpoint managed?
50      * 
51      * @return an UEB Topic Sink
52      * @throws IllegalArgumentException if invalid parameters are present
53      */
54     public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey,
55             boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
56
57     /**
58      * Creates an UEB Topic Writer based on properties files
59      * 
60      * @param properties Properties containing initialization values
61      * 
62      * @return an UEB Topic Writer
63      * @throws IllegalArgumentException if invalid parameters are present
64      */
65     public List<UebTopicSink> build(Properties properties);
66
67     /**
68      * Instantiates a new UEB Topic Writer
69      * 
70      * @param servers list of servers
71      * @param topic topic name
72      * 
73      * @return an UEB Topic Writer
74      * @throws IllegalArgumentException if invalid parameters are present
75      */
76     public UebTopicSink build(List<String> servers, String topic);
77
78     /**
79      * Destroys an UEB Topic Writer based on a topic
80      * 
81      * @param topic topic name
82      * @throws IllegalArgumentException if invalid parameters are present
83      */
84     public void destroy(String topic);
85
86     /**
87      * gets an UEB Topic Writer based on topic name
88      * 
89      * @param topic the topic name
90      * 
91      * @return an UEB Topic Writer with topic name
92      * @throws IllegalArgumentException if an invalid topic is provided
93      * @throws IllegalStateException if the UEB Topic Reader is an incorrect state
94      */
95     public UebTopicSink get(String topic);
96
97     /**
98      * Provides a snapshot of the UEB Topic Writers
99      * 
100      * @return a list of the UEB Topic Writers
101      */
102     public List<UebTopicSink> inventory();
103
104     /**
105      * Destroys all UEB Topic Writers
106      */
107     public void destroy();
108 }
109
110
111 /* ------------- implementation ----------------- */
112
113 /**
114  * Factory of UEB Reader Topics indexed by topic name
115  */
116 class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
117     private static final String MISSING_TOPIC = "A topic must be provided";
118
119     /**
120      * Logger
121      */
122     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
123
124     /**
125      * UEB Topic Name Index
126      */
127     protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
128
129     @Override
130     public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey,
131             boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
132
133         if (servers == null || servers.isEmpty()) {
134             throw new IllegalArgumentException("UEB Server(s) must be provided");
135         }
136
137         if (topic == null || topic.isEmpty()) {
138             throw new IllegalArgumentException(MISSING_TOPIC);
139         }
140
141         synchronized (this) {
142             if (uebTopicSinks.containsKey(topic)) {
143                 return uebTopicSinks.get(topic);
144             }
145
146             UebTopicSink uebTopicWriter = new InlineUebTopicSink(BusTopicParams.builder()
147                     .servers(servers)
148                     .topic(topic)
149                     .apiKey(apiKey)
150                     .apiSecret(apiSecret)
151                     .partitionId(partitionKey)
152                     .useHttps(useHttps)
153                     .allowSelfSignedCerts(allowSelfSignedCerts)
154                     .build());
155
156             if (managed) {
157                 uebTopicSinks.put(topic, uebTopicWriter);
158             }
159
160             return uebTopicWriter;
161         }
162     }
163
164
165     @Override
166     public UebTopicSink build(List<String> servers, String topic) {
167         return this.build(servers, topic, null, null, null, true, false, false);
168     }
169
170
171     @Override
172     public List<UebTopicSink> build(Properties properties) {
173
174         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
175         if (writeTopics == null || writeTopics.isEmpty()) {
176             logger.info("{}: no topic for UEB Sink", this);
177             return new ArrayList<>();
178         }
179
180         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
181         List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
182         synchronized (this) {
183             for (String topic : writeTopicList) {
184                 if (this.uebTopicSinks.containsKey(topic)) {
185                     newUebTopicSinks.add(this.uebTopicSinks.get(topic));
186                     continue;
187                 }
188
189                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
190                         + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
191                 if (servers == null || servers.isEmpty()) {
192                     logger.error("{}: no UEB servers configured for sink {}", this, topic);
193                     continue;
194                 }
195
196                 List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
197
198                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
199                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
200                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
201                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
202                 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
203                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
204
205                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
206                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
207                 boolean managed = true;
208                 if (managedString != null && !managedString.isEmpty()) {
209                     managed = Boolean.parseBoolean(managedString);
210                 }
211
212                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
213                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
214
215                 // default is to use HTTP if no https property exists
216                 boolean useHttps = false;
217                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
218                     useHttps = Boolean.parseBoolean(useHttpsString);
219                 }
220
221
222                 String allowSelfSignedCertsString =
223                         properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
224                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
225
226                 // default is to disallow self-signed certs
227                 boolean allowSelfSignedCerts = false;
228                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
229                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
230                 }
231
232                 UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed,
233                         useHttps, allowSelfSignedCerts);
234                 newUebTopicSinks.add(uebTopicWriter);
235             }
236             return newUebTopicSinks;
237         }
238     }
239
240     @Override
241     public void destroy(String topic) {
242
243         if (topic == null || topic.isEmpty()) {
244             throw new IllegalArgumentException(MISSING_TOPIC);
245         }
246
247         UebTopicSink uebTopicWriter;
248         synchronized (this) {
249             if (!uebTopicSinks.containsKey(topic)) {
250                 return;
251             }
252
253             uebTopicWriter = uebTopicSinks.remove(topic);
254         }
255
256         uebTopicWriter.shutdown();
257     }
258
259     @Override
260     public void destroy() {
261         List<UebTopicSink> writers = this.inventory();
262         for (UebTopicSink writer : writers) {
263             writer.shutdown();
264         }
265
266         synchronized (this) {
267             this.uebTopicSinks.clear();
268         }
269     }
270
271     @Override
272     public UebTopicSink get(String topic) {
273
274         if (topic == null || topic.isEmpty()) {
275             throw new IllegalArgumentException(MISSING_TOPIC);
276         }
277
278         synchronized (this) {
279             if (uebTopicSinks.containsKey(topic)) {
280                 return uebTopicSinks.get(topic);
281             } else {
282                 throw new IllegalStateException("UebTopicSink for " + topic + " not found");
283             }
284         }
285     }
286
287     @Override
288     public synchronized List<UebTopicSink> inventory() {
289         return new ArrayList<>(this.uebTopicSinks.values());
290     }
291
292
293     @Override
294     public String toString() {
295         StringBuilder builder = new StringBuilder();
296         builder.append("IndexedUebTopicSinkFactory []");
297         return builder.toString();
298     }
299
300 }