e79d488841626551ae943811018b0efd655239a6
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Sink Factory.
39  */
40 public interface DmaapTopicSinkFactory {
41     String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     String DME2_VERSION_PROPERTY = "Version";
45     String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Instantiate a new DMAAP Topic Sink, with following params.
52      * servers         list of servers
53      * topic           topic name
54      * apiKey          API Key
55      * apiSecret       API Secret
56      * userName        AAF user name
57      * password        AAF password
58      * partitionKey    Consumer Group
59      * environment     DME2 environment
60      * aftEnvironment  DME2 AFT environment
61      * partner         DME2 Partner
62      * latitude        DME2 latitude
63      * longitude       DME2 longitude
64      * additionalProps additional properties to pass to DME2
65      * managed         is this sink endpoint managed?
66      * @param busTopicParams parameter object
67      * @return DmaapTopicSink object
68      * @throws IllegalArgumentException if invalid parameters are present
69      */
70     DmaapTopicSink build(BusTopicParams busTopicParams);
71
72     /**
73      * Creates an DMAAP Topic Sink based on properties files.
74      *
75      * @param properties Properties containing initialization values
76      * @return an DMAAP Topic Sink
77      * @throws IllegalArgumentException if invalid parameters are present
78      */
79     List<DmaapTopicSink> build(Properties properties);
80
81     /**
82      * Instantiates a new DMAAP Topic Sink.
83      *
84      * @param servers list of servers
85      * @param topic   topic name
86      * @return an DMAAP Topic Sink
87      * @throws IllegalArgumentException if invalid parameters are present
88      */
89     DmaapTopicSink build(List<String> servers, String topic);
90
91     /**
92      * Destroys an DMAAP Topic Sink based on a topic.
93      *
94      * @param topic topic name
95      * @throws IllegalArgumentException if invalid parameters are present
96      */
97     void destroy(String topic);
98
99     /**
100      * Destroys all DMAAP Topic Sinks.
101      */
102     void destroy();
103
104     /**
105      * Gets an DMAAP Topic Sink based on topic name.
106      *
107      * @param topic the topic name
108      * @return an DMAAP Topic Sink with topic name
109      * @throws IllegalArgumentException if an invalid topic is provided
110      * @throws IllegalStateException    if the DMAAP Topic Reader is an incorrect state
111      */
112     DmaapTopicSink get(String topic);
113
114     /**
115      * Provides a snapshot of the DMAAP Topic Sinks.
116      *
117      * @return a list of the DMAAP Topic Sinks
118      */
119     List<DmaapTopicSink> inventory();
120 }
121
122
123 /* ------------- implementation ----------------- */
124
125 /**
126  * Factory of DMAAP Reader Topics indexed by topic name.
127  */
128 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
129     private static final String MISSING_TOPIC = "A topic must be provided";
130
131     /**
132      * Logger.
133      */
134     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
135
136     /**
137      * DMAAP Topic Name Index.
138      */
139     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
140
141     @Override
142     public DmaapTopicSink build(BusTopicParams busTopicParams){
143
144         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
145             throw new IllegalArgumentException(MISSING_TOPIC);
146         }
147
148         synchronized (this) {
149             if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
150                 return dmaapTopicWriters.get(busTopicParams.getTopic());
151             }
152
153             DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(busTopicParams);
154
155             if (busTopicParams.isManaged()) {
156                 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
157             }
158             return dmaapTopicSink;
159         }
160     }
161
162     @Override
163     public DmaapTopicSink build(List<String> servers, String topic) {
164         return this.build(BusTopicParams.builder()
165                 .servers(servers)
166                 .topic(topic)
167                 .managed(true)
168                 .useHttps(false)
169                 .allowSelfSignedCerts(false)
170                 .build());
171     }
172
173     @Override
174     public List<DmaapTopicSink> build(Properties properties) {
175
176         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
177         if (writeTopics == null || writeTopics.isEmpty()) {
178             logger.info("{}: no topic for DMaaP Sink", this);
179             return new ArrayList<>();
180         }
181
182         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
183         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
184         synchronized (this) {
185             for (String topic : writeTopicList) {
186                 if (this.dmaapTopicWriters.containsKey(topic)) {
187                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
188                     continue;
189                 }
190                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
191                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
192
193                 List<String> serverList;
194                 if (servers != null && !servers.isEmpty()) {
195                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
196                 } else {
197                     serverList = new ArrayList<>();
198                 }
199
200                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
201                         + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
202                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
203                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
204
205                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
206                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
207                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
208                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
209
210                 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
211                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
212
213                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
214                         + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
215
216                 /* DME2 Properties */
217
218                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
219                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
220
221                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
222                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
223
224                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
225                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
226
227                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
228                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
229
230                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
231                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
232
233                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
234                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
235
236                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
237                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
238
239                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
240                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
241
242                 String dme2RoundtripTimeoutMs =
243                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
244                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
245
246                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
247                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
248
249                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
250                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
251
252                 String dme2SessionStickinessRequired =
253                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
254                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
255
256                 Map<String, String> dme2AdditionalProps = new HashMap<>();
257
258                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
259                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
260                 }
261                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
262                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
263                 }
264                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
265                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
266                 }
267                 if (dme2Version != null && !dme2Version.isEmpty()) {
268                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
269                 }
270                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
271                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
272                 }
273                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
274                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
275                 }
276                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
277                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
278                 }
279
280                 if (servers == null || servers.isEmpty()) {
281                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
282                     continue;
283                 }
284
285                 boolean managed = true;
286                 if (managedString != null && !managedString.isEmpty()) {
287                     managed = Boolean.parseBoolean(managedString);
288                 }
289
290                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
291                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
292
293                 // default is to use HTTP if no https property exists
294                 boolean useHttps = false;
295                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
296                     useHttps = Boolean.parseBoolean(useHttpsString);
297                 }
298
299
300                 String allowSelfSignedCertsString =
301                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
302                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
303
304                 // default is to disallow self-signed certs
305                 boolean allowSelfSignedCerts = false;
306                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
307                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
308                 }
309
310                 DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
311                         .servers(serverList)
312                         .topic(topic)
313                         .apiKey(apiKey)
314                         .apiSecret(apiSecret)
315                         .userName(aafMechId)
316                         .password(aafPassword)
317                         .partitionId(partitionKey)
318                         .environment(dme2Environment)
319                         .aftEnvironment(dme2AftEnvironment)
320                         .partner(dme2Partner)
321                         .latitude(dme2Latitude)
322                         .longitude(dme2Longitude)
323                         .additionalProps(dme2AdditionalProps)
324                         .managed(managed)
325                         .useHttps(useHttps)
326                         .allowSelfSignedCerts(allowSelfSignedCerts)
327                         .build());
328
329                 newDmaapTopicSinks.add(dmaapTopicSink);
330             }
331             return newDmaapTopicSinks;
332         }
333     }
334
335     @Override
336     public void destroy(String topic) {
337
338         if (topic == null || topic.isEmpty()) {
339             throw new IllegalArgumentException(MISSING_TOPIC);
340         }
341
342         DmaapTopicSink dmaapTopicWriter;
343         synchronized (this) {
344             if (!dmaapTopicWriters.containsKey(topic)) {
345                 return;
346             }
347
348             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
349         }
350
351         dmaapTopicWriter.shutdown();
352     }
353
354     @Override
355     public void destroy() {
356         List<DmaapTopicSink> writers = this.inventory();
357         for (DmaapTopicSink writer : writers) {
358             writer.shutdown();
359         }
360
361         synchronized (this) {
362             this.dmaapTopicWriters.clear();
363         }
364     }
365
366     @Override
367     public DmaapTopicSink get(String topic) {
368
369         if (topic == null || topic.isEmpty()) {
370             throw new IllegalArgumentException(MISSING_TOPIC);
371         }
372
373         synchronized (this) {
374             if (dmaapTopicWriters.containsKey(topic)) {
375                 return dmaapTopicWriters.get(topic);
376             } else {
377                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
378             }
379         }
380     }
381
382     @Override
383     public synchronized List<DmaapTopicSink> inventory() {
384         return new ArrayList<>(this.dmaapTopicWriters.values());
385     }
386
387     @Override
388     public String toString() {
389         StringBuilder builder = new StringBuilder();
390         builder.append("IndexedDmaapTopicSinkFactory []");
391         return builder.toString();
392     }
393
394 }