659833ce779f96b79112f93dbce4bc235ac44b86
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP Policy Engine - Common Modules
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of DMAAP Reader Topics indexed by topic name.
38  */
39 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
40
41     private static final String MISSING_TOPIC = "A topic must be provided";
42
43     /**
44      * Logger.
45      */
46     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
47
48     /**
49      * DMAAP Topic Name Index.
50      */
51     protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
52
53     @Override
54     public DmaapTopicSink build(BusTopicParams busTopicParams) {
55
56         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
57             throw new IllegalArgumentException(MISSING_TOPIC);
58         }
59
60         synchronized (this) {
61             if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
62                 return dmaapTopicWriters.get(busTopicParams.getTopic());
63             }
64
65             DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams);
66
67             if (busTopicParams.isManaged()) {
68                 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
69             }
70             return dmaapTopicSink;
71         }
72     }
73
74     @Override
75     public DmaapTopicSink build(List<String> servers, String topic) {
76         return this.build(BusTopicParams.builder()
77                 .servers(servers)
78                 .topic(topic)
79                 .managed(true)
80                 .useHttps(false)
81                 .allowSelfSignedCerts(false)
82                 .build());
83     }
84
85     @Override
86     public List<DmaapTopicSink> build(Properties properties) {
87
88         String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
89         if (writeTopics == null || writeTopics.isEmpty()) {
90             logger.info("{}: no topic for DMaaP Sink", this);
91             return new ArrayList<>();
92         }
93
94         List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
95         List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
96         synchronized (this) {
97             for (String topic : writeTopicList) {
98                 if (this.dmaapTopicWriters.containsKey(topic)) {
99                     newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
100                     continue;
101                 }
102                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
103                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
104
105                 List<String> serverList;
106                 if (servers != null && !servers.isEmpty()) {
107                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
108                 } else {
109                     serverList = new ArrayList<>();
110                 }
111
112                 final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
113                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
114                 final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
115                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
116
117                 final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
118                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
119                 final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
120                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
121
122                 final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
123                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
124
125                 final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
126                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
127
128                 /* DME2 Properties */
129
130                 final String dme2Environment = properties.getProperty(
131                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
132                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
133
134                 final String dme2AftEnvironment = properties.getProperty(
135                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
136                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
137
138                 final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
139                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
140
141                 final String dme2RouteOffer = properties.getProperty(
142                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
143                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
144
145                 final String dme2Latitude = properties.getProperty(
146                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
147                                 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
148
149                 final String dme2Longitude = properties.getProperty(
150                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
151                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
152
153                 final String dme2EpReadTimeoutMs = properties.getProperty(
154                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
155                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
156
157                 final String dme2EpConnTimeout = properties.getProperty(
158                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
159                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
160
161                 final String dme2RoundtripTimeoutMs =
162                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
163                                 + "." + topic
164                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
165
166                 final String dme2Version = properties.getProperty(
167                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
168                                 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
169
170                 final String dme2SubContextPath = properties.getProperty(
171                         PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
172                                 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
173
174                 final String dme2SessionStickinessRequired =
175                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
176                                 + "." + topic
177                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
178
179                 Map<String, String> dme2AdditionalProps = new HashMap<>();
180
181                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
182                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
183                 }
184                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
185                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
186                 }
187                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
188                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
189                 }
190                 if (dme2Version != null && !dme2Version.isEmpty()) {
191                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
192                 }
193                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
194                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
195                 }
196                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
197                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
198                 }
199                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
200                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
201                 }
202
203                 if (servers == null || servers.isEmpty()) {
204                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
205                     continue;
206                 }
207
208                 boolean managed = true;
209                 if (managedString != null && !managedString.isEmpty()) {
210                     managed = Boolean.parseBoolean(managedString);
211                 }
212
213                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
214                         + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
215
216                 // default is to use HTTP if no https property exists
217                 boolean useHttps = false;
218                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
219                     useHttps = Boolean.parseBoolean(useHttpsString);
220                 }
221
222                 String allowSelfSignedCertsString =
223                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
224                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
225
226                 // default is to disallow self-signed certs
227                 boolean allowSelfSignedCerts = false;
228                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
229                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
230                 }
231
232                 DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
233                         .servers(serverList)
234                         .topic(topic)
235                         .apiKey(apiKey)
236                         .apiSecret(apiSecret)
237                         .userName(aafMechId)
238                         .password(aafPassword)
239                         .partitionId(partitionKey)
240                         .environment(dme2Environment)
241                         .aftEnvironment(dme2AftEnvironment)
242                         .partner(dme2Partner)
243                         .latitude(dme2Latitude)
244                         .longitude(dme2Longitude)
245                         .additionalProps(dme2AdditionalProps)
246                         .managed(managed)
247                         .useHttps(useHttps)
248                         .allowSelfSignedCerts(allowSelfSignedCerts)
249                         .build());
250
251                 newDmaapTopicSinks.add(dmaapTopicSink);
252             }
253             return newDmaapTopicSinks;
254         }
255     }
256
257     /**
258      * Makes a new sink.
259      *
260      * @param busTopicParams parameters to use to configure the sink
261      * @return a new sink
262      */
263     protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
264         return new InlineDmaapTopicSink(busTopicParams);
265     }
266
267     @Override
268     public void destroy(String topic) {
269
270         if (topic == null || topic.isEmpty()) {
271             throw new IllegalArgumentException(MISSING_TOPIC);
272         }
273
274         DmaapTopicSink dmaapTopicWriter;
275         synchronized (this) {
276             if (!dmaapTopicWriters.containsKey(topic)) {
277                 return;
278             }
279
280             dmaapTopicWriter = dmaapTopicWriters.remove(topic);
281         }
282
283         dmaapTopicWriter.shutdown();
284     }
285
286     @Override
287     public void destroy() {
288         List<DmaapTopicSink> writers = this.inventory();
289         for (DmaapTopicSink writer : writers) {
290             writer.shutdown();
291         }
292
293         synchronized (this) {
294             this.dmaapTopicWriters.clear();
295         }
296     }
297
298     @Override
299     public DmaapTopicSink get(String topic) {
300
301         if (topic == null || topic.isEmpty()) {
302             throw new IllegalArgumentException(MISSING_TOPIC);
303         }
304
305         synchronized (this) {
306             if (dmaapTopicWriters.containsKey(topic)) {
307                 return dmaapTopicWriters.get(topic);
308             } else {
309                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
310             }
311         }
312     }
313
314     @Override
315     public synchronized List<DmaapTopicSink> inventory() {
316         return new ArrayList<>(this.dmaapTopicWriters.values());
317     }
318
319     @Override
320     public String toString() {
321         return "IndexedDmaapTopicSinkFactory []";
322     }
323
324 }