e252988ead0f897c50e06b5aef888828f9b9a811
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.impl;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
31 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Factory of DMAAP Reader Topics indexed by topic name
39  */
40 public class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
41     private static final String MISSING_TOPIC = "A topic must be provided";
42
43     private static final IndexedDmaapTopicSinkFactory instance = new IndexedDmaapTopicSinkFactory();
44     /**
45      * Logger
46      */
47     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
48
49     /**
50      * DMAAP Topic Name Index
51      */
52     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
53
54     /**
55      * Get the singleton instance.
56      * 
57      * @return the instance
58      */
59     public static IndexedDmaapTopicSinkFactory getInstance() {
60         return instance;
61     }
62
63     private IndexedDmaapTopicSinkFactory() {}
64
65     @Override
66     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
67             String password, String partitionKey, String environment, String aftEnvironment, String partner,
68             String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
69             boolean allowSelfSignedCerts) {
70
71         if (topic == null || topic.isEmpty()) {
72             throw new IllegalArgumentException(MISSING_TOPIC);
73         }
74
75         synchronized (this) {
76             if (dmaapTopicWriters.containsKey(topic)) {
77                 return dmaapTopicWriters.get(topic);
78             }
79
80             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName,
81                     password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps,
82                     useHttps, allowSelfSignedCerts);
83
84             if (managed) {
85                 dmaapTopicWriters.put(topic, dmaapTopicSink);
86             }
87             return dmaapTopicSink;
88         }
89     }
90
91     @Override
92     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
93             String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
94
95         if (topic == null || topic.isEmpty()) {
96             throw new IllegalArgumentException(MISSING_TOPIC);
97         }
98
99         synchronized (this) {
100             if (dmaapTopicWriters.containsKey(topic)) {
101                 return dmaapTopicWriters.get(topic);
102             }
103
104             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName,
105                     password, partitionKey, useHttps, allowSelfSignedCerts);
106
107             if (managed) {
108                 dmaapTopicWriters.put(topic, dmaapTopicSink);
109             }
110             return dmaapTopicSink;
111         }
112     }
113
114     @Override
115     public DmaapTopicSink build(List<String> servers, String topic) {
116         return this.build(servers, topic, null, null, null, null, null, true, false, false);
117     }
118
119     @Override
120     public List<DmaapTopicSink> build(Properties properties) {
121
122         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
123         if (writeTopics == null || writeTopics.isEmpty()) {
124             logger.info("{}: no topic for DMaaP Sink", this);
125             return new ArrayList<>();
126         }
127
128         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
129         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
130         synchronized (this) {
131             for (String topic : writeTopicList) {
132                 if (this.dmaapTopicWriters.containsKey(topic)) {
133                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
134                     continue;
135                 }
136                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
137                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
138
139                 List<String> serverList;
140                 if (servers != null && !servers.isEmpty()) {
141                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
142                 } else {
143                     serverList = new ArrayList<>();
144                 }
145
146                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
147                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
148                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
149                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
150
151                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
152                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
153                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
154                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
155
156                 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
157                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
158
159                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
160                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
161
162                 /* DME2 Properties */
163
164                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
165                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
166
167                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
168                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
169
170                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
171                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
172
173                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
174                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
175
176                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
177                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
178
179                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
180                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
181
182                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
183                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
184
185                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
186                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
187
188                 String dme2RoundtripTimeoutMs =
189                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
190                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
191
192                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
193                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
194
195                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
196                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
197
198                 String dme2SessionStickinessRequired =
199                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
200                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
201
202                 Map<String, String> dme2AdditionalProps = new HashMap<>();
203
204                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
205                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
206                 }
207                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
208                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
209                 }
210                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
211                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
212                 }
213                 if (dme2Version != null && !dme2Version.isEmpty()) {
214                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
215                 }
216                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
217                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
218                 }
219                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
220                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
221                 }
222                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
223                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
224                 }
225
226                 if (servers == null || servers.isEmpty()) {
227                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
228                     continue;
229                 }
230
231                 boolean managed = true;
232                 if (managedString != null && !managedString.isEmpty()) {
233                     managed = Boolean.parseBoolean(managedString);
234                 }
235
236                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
237                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
238
239                 // default is to use HTTP if no https property exists
240                 boolean useHttps = false;
241                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
242                     useHttps = Boolean.parseBoolean(useHttpsString);
243                 }
244
245
246                 String allowSelfSignedCertsString =
247                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
248                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
249
250                 // default is to disallow self-signed certs
251                 boolean allowSelfSignedCerts = false;
252                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
253                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
254                 }
255
256                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword,
257                         partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude,
258                         dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
259
260                 newDmaapTopicSinks.add(dmaapTopicSink);
261             }
262             return newDmaapTopicSinks;
263         }
264     }
265
266     @Override
267     public void destroy(String topic) {
268
269         if (topic == null || topic.isEmpty()) {
270             throw new IllegalArgumentException(MISSING_TOPIC);
271         }
272
273         DmaapTopicSink dmaapTopicWriter;
274         synchronized (this) {
275             if (!dmaapTopicWriters.containsKey(topic)) {
276                 return;
277             }
278
279             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
280         }
281
282         dmaapTopicWriter.shutdown();
283     }
284
285     @Override
286     public void destroy() {
287         List<DmaapTopicSink> writers = this.inventory();
288         for (DmaapTopicSink writer : writers) {
289             writer.shutdown();
290         }
291
292         synchronized (this) {
293             this.dmaapTopicWriters.clear();
294         }
295     }
296
297     @Override
298     public DmaapTopicSink get(String topic) {
299
300         if (topic == null || topic.isEmpty()) {
301             throw new IllegalArgumentException(MISSING_TOPIC);
302         }
303
304         synchronized (this) {
305             if (dmaapTopicWriters.containsKey(topic)) {
306                 return dmaapTopicWriters.get(topic);
307             } else {
308                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
309             }
310         }
311     }
312
313     @Override
314     public synchronized List<DmaapTopicSink> inventory() {
315         return new ArrayList<>(this.dmaapTopicWriters.values());
316     }
317
318     @Override
319     public String toString() {
320         StringBuilder builder = new StringBuilder();
321         builder.append("IndexedDmaapTopicSinkFactory []");
322         return builder.toString();
323     }
324
325 }