19bdc7b22999da649d95c52b009796c709168038
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.impl;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
30 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSinkFactory;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of UEB Reader Topics indexed by topic name
38  */
39 public class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
40
41     private static final IndexedUebTopicSinkFactory instance = new IndexedUebTopicSinkFactory();
42
43     private static final String MISSING_TOPIC = "A topic must be provided";
44
45     /**
46      * Logger
47      */
48     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
49
50     /**
51      * UEB Topic Name Index
52      */
53     protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
54
55     /**
56      * Get the singleton instance.
57      * 
58      * @return the instance
59      */
60     public static IndexedUebTopicSinkFactory getInstance() {
61         return instance;
62     }
63
64     private IndexedUebTopicSinkFactory() {}
65
66     @Override
67     public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey,
68             boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
69
70         if (servers == null || servers.isEmpty()) {
71             throw new IllegalArgumentException("UEB Server(s) must be provided");
72         }
73
74         if (topic == null || topic.isEmpty()) {
75             throw new IllegalArgumentException(MISSING_TOPIC);
76         }
77
78         synchronized (this) {
79             if (uebTopicSinks.containsKey(topic)) {
80                 return uebTopicSinks.get(topic);
81             }
82
83             UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey,
84                     useHttps, allowSelfSignedCerts);
85
86             if (managed) {
87                 uebTopicSinks.put(topic, uebTopicWriter);
88             }
89
90             return uebTopicWriter;
91         }
92     }
93
94
95     @Override
96     public UebTopicSink build(List<String> servers, String topic) {
97         return this.build(servers, topic, null, null, null, true, false, false);
98     }
99
100
101     @Override
102     public List<UebTopicSink> build(Properties properties) {
103
104         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
105         if (writeTopics == null || writeTopics.isEmpty()) {
106             logger.info("{}: no topic for UEB Sink", this);
107             return new ArrayList<>();
108         }
109
110         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
111         List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
112         synchronized (this) {
113             for (String topic : writeTopicList) {
114                 if (this.uebTopicSinks.containsKey(topic)) {
115                     newUebTopicSinks.add(this.uebTopicSinks.get(topic));
116                     continue;
117                 }
118
119                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
120                         + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
121                 if (servers == null || servers.isEmpty()) {
122                     logger.error("{}: no UEB servers configured for sink {}", this, topic);
123                     continue;
124                 }
125
126                 List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
127
128                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
129                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
130                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
131                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
132                 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
133                         + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
134
135                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
136                         + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
137                 boolean managed = true;
138                 if (managedString != null && !managedString.isEmpty()) {
139                     managed = Boolean.parseBoolean(managedString);
140                 }
141
142                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
143                         + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
144
145                 // default is to use HTTP if no https property exists
146                 boolean useHttps = false;
147                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
148                     useHttps = Boolean.parseBoolean(useHttpsString);
149                 }
150
151
152                 String allowSelfSignedCertsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS
153                         + "." + topic + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
154
155                 // default is to disallow self-signed certs
156                 boolean allowSelfSignedCerts = false;
157                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
158                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
159                 }
160
161                 UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed,
162                         useHttps, allowSelfSignedCerts);
163                 newUebTopicSinks.add(uebTopicWriter);
164             }
165             return newUebTopicSinks;
166         }
167     }
168
169     @Override
170     public void destroy(String topic) {
171
172         if (topic == null || topic.isEmpty()) {
173             throw new IllegalArgumentException(MISSING_TOPIC);
174         }
175
176         UebTopicSink uebTopicWriter;
177         synchronized (this) {
178             if (!uebTopicSinks.containsKey(topic)) {
179                 return;
180             }
181
182             uebTopicWriter = uebTopicSinks.remove(topic);
183         }
184
185         uebTopicWriter.shutdown();
186     }
187
188     @Override
189     public void destroy() {
190         List<UebTopicSink> writers = this.inventory();
191         for (UebTopicSink writer : writers) {
192             writer.shutdown();
193         }
194
195         synchronized (this) {
196             this.uebTopicSinks.clear();
197         }
198     }
199
200     @Override
201     public UebTopicSink get(String topic) {
202
203         if (topic == null || topic.isEmpty()) {
204             throw new IllegalArgumentException(MISSING_TOPIC);
205         }
206
207         synchronized (this) {
208             if (uebTopicSinks.containsKey(topic)) {
209                 return uebTopicSinks.get(topic);
210             } else {
211                 throw new IllegalStateException("UebTopicSink for " + topic + " not found");
212             }
213         }
214     }
215
216     @Override
217     public synchronized List<UebTopicSink> inventory() {
218         return new ArrayList<>(this.uebTopicSinks.values());
219     }
220
221
222     @Override
223     public String toString() {
224         StringBuilder builder = new StringBuilder();
225         builder.append("IndexedUebTopicSinkFactory []");
226         return builder.toString();
227     }
228
229 }