2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * DMAAP Topic Sink Factory
40 public interface DmaapTopicSinkFactory {
41 public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42 public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43 public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44 public final String DME2_VERSION_PROPERTY = "Version";
45 public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46 public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47 public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48 public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
51 * Instantiates a new DMAAP Topic Sink
53 * @param servers list of servers
54 * @param topic topic name
55 * @param apiKey API Key
56 * @param apiSecret API Secret
57 * @param userName AAF user name
58 * @param password AAF password
59 * @param partitionKey Consumer Group
60 * @param environment DME2 environment
61 * @param aftEnvironment DME2 AFT environment
62 * @param partner DME2 Partner
63 * @param latitude DME2 latitude
64 * @param longitude DME2 longitude
65 * @param additionalProps additional properties to pass to DME2
66 * @param managed is this sink endpoint managed?
67 * @return an DMAAP Topic Sink
68 * @throws IllegalArgumentException if invalid parameters are present
70 public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
71 String password, String partitionKey, String environment, String aftEnvironment, String partner,
72 String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
73 boolean allowSelfSignedCerts);
76 * Instantiates a new DMAAP Topic Sink
78 * @param servers list of servers
79 * @param topic topic name
80 * @param apiKey API Key
81 * @param apiSecret API Secret
82 * @param userName AAF user name
83 * @param password AAF password
84 * @param partitionKey Consumer Group
85 * @param managed is this sink endpoint managed?
86 * @return an DMAAP Topic Sink
87 * @throws IllegalArgumentException if invalid parameters are present
89 public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
90 String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
93 * Creates an DMAAP Topic Sink based on properties files
95 * @param properties Properties containing initialization values
96 * @return an DMAAP Topic Sink
97 * @throws IllegalArgumentException if invalid parameters are present
99 public List<DmaapTopicSink> build(Properties properties);
102 * Instantiates a new DMAAP Topic Sink
104 * @param servers list of servers
105 * @param topic topic name
106 * @return an DMAAP Topic Sink
107 * @throws IllegalArgumentException if invalid parameters are present
109 public DmaapTopicSink build(List<String> servers, String topic);
112 * Destroys an DMAAP Topic Sink based on a topic
114 * @param topic topic name
115 * @throws IllegalArgumentException if invalid parameters are present
117 public void destroy(String topic);
120 * gets an DMAAP Topic Sink based on topic name
122 * @param topic the topic name
123 * @return an DMAAP Topic Sink with topic name
124 * @throws IllegalArgumentException if an invalid topic is provided
125 * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
127 public DmaapTopicSink get(String topic);
130 * Provides a snapshot of the DMAAP Topic Sinks
132 * @return a list of the DMAAP Topic Sinks
134 public List<DmaapTopicSink> inventory();
137 * Destroys all DMAAP Topic Sinks
139 public void destroy();
143 /* ------------- implementation ----------------- */
146 * Factory of DMAAP Reader Topics indexed by topic name
148 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
149 private static final String MISSING_TOPIC = "A topic must be provided";
154 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
157 * DMAAP Topic Name Index
159 protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
162 public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
163 String password, String partitionKey, String environment, String aftEnvironment, String partner,
164 String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
165 boolean allowSelfSignedCerts) {
167 if (topic == null || topic.isEmpty()) {
168 throw new IllegalArgumentException(MISSING_TOPIC);
171 synchronized (this) {
172 if (dmaapTopicWriters.containsKey(topic)) {
173 return dmaapTopicWriters.get(topic);
176 DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
180 .apiSecret(apiSecret)
183 .partitionId(partitionKey)
184 .environment(environment)
185 .aftEnvironment(aftEnvironment)
188 .longitude(longitude)
189 .additionalProps(additionalProps)
191 .allowSelfSignedCerts(allowSelfSignedCerts)
195 dmaapTopicWriters.put(topic, dmaapTopicSink);
197 return dmaapTopicSink;
202 public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
203 String password, String partitionKey, boolean managed, boolean useHttps,
204 boolean allowSelfSignedCerts) {
206 if (topic == null || topic.isEmpty()) {
207 throw new IllegalArgumentException(MISSING_TOPIC);
210 synchronized (this) {
211 if (dmaapTopicWriters.containsKey(topic)) {
212 return dmaapTopicWriters.get(topic);
215 DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
219 .apiSecret(apiSecret)
222 .partitionId(partitionKey)
224 .allowSelfSignedCerts(allowSelfSignedCerts)
228 dmaapTopicWriters.put(topic, dmaapTopicSink);
230 return dmaapTopicSink;
235 public DmaapTopicSink build(List<String> servers, String topic) {
236 return this.build(servers, topic, null, null, null, null, null, true, false, false);
240 public List<DmaapTopicSink> build(Properties properties) {
242 String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
243 if (writeTopics == null || writeTopics.isEmpty()) {
244 logger.info("{}: no topic for DMaaP Sink", this);
245 return new ArrayList<>();
248 List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
249 List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
250 synchronized (this) {
251 for (String topic : writeTopicList) {
252 if (this.dmaapTopicWriters.containsKey(topic)) {
253 newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
256 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
257 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
259 List<String> serverList;
260 if (servers != null && !servers.isEmpty()) {
261 serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
263 serverList = new ArrayList<>();
266 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
267 + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
268 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
269 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
271 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
272 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
273 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
274 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
276 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
277 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
279 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
280 + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
282 /* DME2 Properties */
284 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
285 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
287 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
288 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
290 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
291 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
293 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
294 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
296 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
297 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
299 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
300 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
302 String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
303 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
305 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
306 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
308 String dme2RoundtripTimeoutMs =
309 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
310 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
312 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
313 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
315 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
316 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
318 String dme2SessionStickinessRequired =
319 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
320 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
322 Map<String, String> dme2AdditionalProps = new HashMap<>();
324 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
325 dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
327 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
328 dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
330 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
331 dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
333 if (dme2Version != null && !dme2Version.isEmpty()) {
334 dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
336 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
337 dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
339 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
340 dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
342 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
343 dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
346 if (servers == null || servers.isEmpty()) {
347 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
351 boolean managed = true;
352 if (managedString != null && !managedString.isEmpty()) {
353 managed = Boolean.parseBoolean(managedString);
356 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
357 + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
359 // default is to use HTTP if no https property exists
360 boolean useHttps = false;
361 if (useHttpsString != null && !useHttpsString.isEmpty()) {
362 useHttps = Boolean.parseBoolean(useHttpsString);
366 String allowSelfSignedCertsString =
367 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
368 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
370 // default is to disallow self-signed certs
371 boolean allowSelfSignedCerts = false;
372 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
373 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
376 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword,
377 partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude,
378 dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
380 newDmaapTopicSinks.add(dmaapTopicSink);
382 return newDmaapTopicSinks;
387 public void destroy(String topic) {
389 if (topic == null || topic.isEmpty()) {
390 throw new IllegalArgumentException(MISSING_TOPIC);
393 DmaapTopicSink dmaapTopicWriter;
394 synchronized (this) {
395 if (!dmaapTopicWriters.containsKey(topic)) {
399 dmaapTopicWriter = dmaapTopicWriters.remove(topic);
402 dmaapTopicWriter.shutdown();
406 public void destroy() {
407 List<DmaapTopicSink> writers = this.inventory();
408 for (DmaapTopicSink writer : writers) {
412 synchronized (this) {
413 this.dmaapTopicWriters.clear();
418 public DmaapTopicSink get(String topic) {
420 if (topic == null || topic.isEmpty()) {
421 throw new IllegalArgumentException(MISSING_TOPIC);
424 synchronized (this) {
425 if (dmaapTopicWriters.containsKey(topic)) {
426 return dmaapTopicWriters.get(topic);
428 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
434 public synchronized List<DmaapTopicSink> inventory() {
435 return new ArrayList<>(this.dmaapTopicWriters.values());
439 public String toString() {
440 StringBuilder builder = new StringBuilder();
441 builder.append("IndexedDmaapTopicSinkFactory []");
442 return builder.toString();