4ccf08de6d9e7c55806bc1458cfc64c9fb044ab5
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of DMAAP Reader Topics indexed by topic name.
38  */
39 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
40
41     private static final String MISSING_TOPIC = "A topic must be provided";
42
43     /**
44      * Logger.
45      */
46     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
47
48     /**
49      * DMAAP Topic Name Index.
50      */
51     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
52
53     @Override
54     public DmaapTopicSink build(BusTopicParams busTopicParams) {
55
56         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
57             throw new IllegalArgumentException(MISSING_TOPIC);
58         }
59
60         synchronized (this) {
61             if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
62                 return dmaapTopicWriters.get(busTopicParams.getTopic());
63             }
64
65             DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams);
66
67             if (busTopicParams.isManaged()) {
68                 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
69             }
70             return dmaapTopicSink;
71         }
72     }
73
74     @Override
75     public DmaapTopicSink build(List<String> servers, String topic) {
76         return this.build(BusTopicParams.builder()
77                 .servers(servers)
78                 .topic(topic)
79                 .managed(true)
80                 .useHttps(false)
81                 .allowSelfSignedCerts(false)
82                 .build());
83     }
84
85     @Override
86     public List<DmaapTopicSink> build(Properties properties) {
87
88         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
89         if (writeTopics == null || writeTopics.isEmpty()) {
90             logger.info("{}: no topic for DMaaP Sink", this);
91             return new ArrayList<>();
92         }
93
94         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
95         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
96         synchronized (this) {
97             for (String topic : writeTopicList) {
98                 if (this.dmaapTopicWriters.containsKey(topic)) {
99                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
100                     continue;
101                 }
102                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
103                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
104
105                 List<String> serverList;
106                 if (servers != null && !servers.isEmpty()) {
107                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
108                 } else {
109                     serverList = new ArrayList<>();
110                 }
111
112                 final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
113                     + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic);
114
115                 final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
116                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
117                 final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
118                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
119
120                 final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
121                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
122                 final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
123                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
124
125                 final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
126                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
127
128                 final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
129                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
130
131                 /* DME2 Properties */
132
133                 final String dme2Environment = properties.getProperty(
134                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
135                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
136
137                 final String dme2AftEnvironment = properties.getProperty(
138                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
139                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
140
141                 final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
142                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
143
144                 final String dme2RouteOffer = properties.getProperty(
145                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
146                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
147
148                 final String dme2Latitude = properties.getProperty(
149                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
150                                 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
151
152                 final String dme2Longitude = properties.getProperty(
153                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
154                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
155
156                 final String dme2EpReadTimeoutMs = properties.getProperty(
157                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
158                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
159
160                 final String dme2EpConnTimeout = properties.getProperty(
161                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
162                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
163
164                 final String dme2RoundtripTimeoutMs =
165                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
166                                 + "." + topic
167                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
168
169                 final String dme2Version = properties.getProperty(
170                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
171                                 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
172
173                 final String dme2SubContextPath = properties.getProperty(
174                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
175                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
176
177                 final String dme2SessionStickinessRequired =
178                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
179                                 + "." + topic
180                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
181
182                 Map<String, String> dme2AdditionalProps = new HashMap<>();
183
184                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
185                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
186                 }
187                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
188                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
189                 }
190                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
191                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
192                 }
193                 if (dme2Version != null && !dme2Version.isEmpty()) {
194                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
195                 }
196                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
197                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
198                 }
199                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
200                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
201                 }
202                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
203                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
204                 }
205
206                 if (servers == null || servers.isEmpty()) {
207                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
208                     continue;
209                 }
210
211                 boolean managed = true;
212                 if (managedString != null && !managedString.isEmpty()) {
213                     managed = Boolean.parseBoolean(managedString);
214                 }
215
216                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
217                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
218
219                 // default is to use HTTP if no https property exists
220                 boolean useHttps = false;
221                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
222                     useHttps = Boolean.parseBoolean(useHttpsString);
223                 }
224
225                 String allowSelfSignedCertsString =
226                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
227                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
228
229                 // default is to disallow self-signed certs
230                 boolean allowSelfSignedCerts = false;
231                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
232                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
233                 }
234
235                 DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
236                         .servers(serverList)
237                         .topic(topic)
238                         .effectiveTopic(effectiveTopic)
239                         .apiKey(apiKey)
240                         .apiSecret(apiSecret)
241                         .userName(aafMechId)
242                         .password(aafPassword)
243                         .partitionId(partitionKey)
244                         .environment(dme2Environment)
245                         .aftEnvironment(dme2AftEnvironment)
246                         .partner(dme2Partner)
247                         .latitude(dme2Latitude)
248                         .longitude(dme2Longitude)
249                         .additionalProps(dme2AdditionalProps)
250                         .managed(managed)
251                         .useHttps(useHttps)
252                         .allowSelfSignedCerts(allowSelfSignedCerts)
253                         .build());
254
255                 newDmaapTopicSinks.add(dmaapTopicSink);
256             }
257             return newDmaapTopicSinks;
258         }
259     }
260
261     /**
262      * Makes a new sink.
263      *
264      * @param busTopicParams parameters to use to configure the sink
265      * @return a new sink
266      */
267     protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
268         return new InlineDmaapTopicSink(busTopicParams);
269     }
270
271     @Override
272     public void destroy(String topic) {
273
274         if (topic == null || topic.isEmpty()) {
275             throw new IllegalArgumentException(MISSING_TOPIC);
276         }
277
278         DmaapTopicSink dmaapTopicWriter;
279         synchronized (this) {
280             if (!dmaapTopicWriters.containsKey(topic)) {
281                 return;
282             }
283
284             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
285         }
286
287         dmaapTopicWriter.shutdown();
288     }
289
290     @Override
291     public void destroy() {
292         List<DmaapTopicSink> writers = this.inventory();
293         for (DmaapTopicSink writer : writers) {
294             writer.shutdown();
295         }
296
297         synchronized (this) {
298             this.dmaapTopicWriters.clear();
299         }
300     }
301
302     @Override
303     public DmaapTopicSink get(String topic) {
304
305         if (topic == null || topic.isEmpty()) {
306             throw new IllegalArgumentException(MISSING_TOPIC);
307         }
308
309         synchronized (this) {
310             if (dmaapTopicWriters.containsKey(topic)) {
311                 return dmaapTopicWriters.get(topic);
312             } else {
313                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
314             }
315         }
316     }
317
318     @Override
319     public synchronized List<DmaapTopicSink> inventory() {
320         return new ArrayList<>(this.dmaapTopicWriters.values());
321     }
322
323     @Override
324     public String toString() {
325         return "IndexedDmaapTopicSinkFactory []";
326     }
327
328 }