2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * DMAAP Topic Sink Factory.
40 public interface DmaapTopicSinkFactory {
41 public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42 public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43 public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44 public final String DME2_VERSION_PROPERTY = "Version";
45 public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46 public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47 public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48 public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
51 * Instantiates a new DMAAP Topic Sink.
53 * @param servers list of servers
54 * @param topic topic name
55 * @param apiKey API Key
56 * @param apiSecret API Secret
57 * @param userName AAF user name
58 * @param password AAF password
59 * @param partitionKey Consumer Group
60 * @param environment DME2 environment
61 * @param aftEnvironment DME2 AFT environment
62 * @param partner DME2 Partner
63 * @param latitude DME2 latitude
64 * @param longitude DME2 longitude
65 * @param additionalProps additional properties to pass to DME2
66 * @param managed is this sink endpoint managed?
67 * @return an DMAAP Topic Sink
68 * @throws IllegalArgumentException if invalid parameters are present
70 public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
71 String password, String partitionKey, String environment, String aftEnvironment,
72 String partner, String latitude, String longitude, Map<String, String> additionalProps,
73 boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
76 * Instantiates a new DMAAP Topic Sink.
78 * @param servers list of servers
79 * @param topic topic name
80 * @param apiKey API Key
81 * @param apiSecret API Secret
82 * @param userName AAF user name
83 * @param password AAF password
84 * @param partitionKey Consumer Group
85 * @param managed is this sink endpoint managed?
86 * @return an DMAAP Topic Sink
87 * @throws IllegalArgumentException if invalid parameters are present
89 public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
90 String password, String partitionKey, boolean managed,
91 boolean useHttps, boolean allowSelfSignedCerts);
94 * Creates an DMAAP Topic Sink based on properties files.
96 * @param properties Properties containing initialization values
97 * @return an DMAAP Topic Sink
98 * @throws IllegalArgumentException if invalid parameters are present
100 public List<DmaapTopicSink> build(Properties properties);
103 * Instantiates a new DMAAP Topic Sink.
105 * @param servers list of servers
106 * @param topic topic name
107 * @return an DMAAP Topic Sink
108 * @throws IllegalArgumentException if invalid parameters are present
110 public DmaapTopicSink build(List<String> servers, String topic);
113 * Destroys an DMAAP Topic Sink based on a topic.
115 * @param topic topic name
116 * @throws IllegalArgumentException if invalid parameters are present
118 public void destroy(String topic);
121 * Destroys all DMAAP Topic Sinks.
123 public void destroy();
126 * Gets an DMAAP Topic Sink based on topic name.
128 * @param topic the topic name
129 * @return an DMAAP Topic Sink with topic name
130 * @throws IllegalArgumentException if an invalid topic is provided
131 * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
133 public DmaapTopicSink get(String topic);
136 * Provides a snapshot of the DMAAP Topic Sinks.
138 * @return a list of the DMAAP Topic Sinks
140 public List<DmaapTopicSink> inventory();
144 /* ------------- implementation ----------------- */
147 * Factory of DMAAP Reader Topics indexed by topic name.
149 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
150 private static final String MISSING_TOPIC = "A topic must be provided";
155 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
158 * DMAAP Topic Name Index.
160 protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
163 public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
164 String password, String partitionKey, String environment, String aftEnvironment,
165 String partner, String latitude, String longitude, Map<String, String> additionalProps,
166 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
168 if (topic == null || topic.isEmpty()) {
169 throw new IllegalArgumentException(MISSING_TOPIC);
172 synchronized (this) {
173 if (dmaapTopicWriters.containsKey(topic)) {
174 return dmaapTopicWriters.get(topic);
177 DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
181 .apiSecret(apiSecret)
184 .partitionId(partitionKey)
185 .environment(environment)
186 .aftEnvironment(aftEnvironment)
189 .longitude(longitude)
190 .additionalProps(additionalProps)
192 .allowSelfSignedCerts(allowSelfSignedCerts)
196 dmaapTopicWriters.put(topic, dmaapTopicSink);
198 return dmaapTopicSink;
203 public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
204 String password, String partitionKey, boolean managed, boolean useHttps,
205 boolean allowSelfSignedCerts) {
207 if (topic == null || topic.isEmpty()) {
208 throw new IllegalArgumentException(MISSING_TOPIC);
211 synchronized (this) {
212 if (dmaapTopicWriters.containsKey(topic)) {
213 return dmaapTopicWriters.get(topic);
216 DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
220 .apiSecret(apiSecret)
223 .partitionId(partitionKey)
225 .allowSelfSignedCerts(allowSelfSignedCerts)
229 dmaapTopicWriters.put(topic, dmaapTopicSink);
231 return dmaapTopicSink;
236 public DmaapTopicSink build(List<String> servers, String topic) {
237 return this.build(servers, topic, null, null, null, null, null, true, false, false);
241 public List<DmaapTopicSink> build(Properties properties) {
243 String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
244 if (writeTopics == null || writeTopics.isEmpty()) {
245 logger.info("{}: no topic for DMaaP Sink", this);
246 return new ArrayList<>();
249 List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
250 List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
251 synchronized (this) {
252 for (String topic : writeTopicList) {
253 if (this.dmaapTopicWriters.containsKey(topic)) {
254 newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
257 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
258 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
260 List<String> serverList;
261 if (servers != null && !servers.isEmpty()) {
262 serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
264 serverList = new ArrayList<>();
267 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
268 + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
269 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
270 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
272 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
273 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
274 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
275 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
277 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
278 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
280 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
281 + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
283 /* DME2 Properties */
285 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
286 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
288 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
289 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
291 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
292 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
294 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
295 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
297 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
298 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
300 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
301 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
303 String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
304 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
306 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
307 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
309 String dme2RoundtripTimeoutMs =
310 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
311 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
313 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
314 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
316 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
317 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
319 String dme2SessionStickinessRequired =
320 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
321 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
323 Map<String, String> dme2AdditionalProps = new HashMap<>();
325 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
326 dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
328 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
329 dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
331 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
332 dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
334 if (dme2Version != null && !dme2Version.isEmpty()) {
335 dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
337 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
338 dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
340 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
341 dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
343 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
344 dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
347 if (servers == null || servers.isEmpty()) {
348 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
352 boolean managed = true;
353 if (managedString != null && !managedString.isEmpty()) {
354 managed = Boolean.parseBoolean(managedString);
357 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
358 + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
360 // default is to use HTTP if no https property exists
361 boolean useHttps = false;
362 if (useHttpsString != null && !useHttpsString.isEmpty()) {
363 useHttps = Boolean.parseBoolean(useHttpsString);
367 String allowSelfSignedCertsString =
368 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
369 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
371 // default is to disallow self-signed certs
372 boolean allowSelfSignedCerts = false;
373 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
374 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
377 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword,
378 partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude,
379 dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
381 newDmaapTopicSinks.add(dmaapTopicSink);
383 return newDmaapTopicSinks;
388 public void destroy(String topic) {
390 if (topic == null || topic.isEmpty()) {
391 throw new IllegalArgumentException(MISSING_TOPIC);
394 DmaapTopicSink dmaapTopicWriter;
395 synchronized (this) {
396 if (!dmaapTopicWriters.containsKey(topic)) {
400 dmaapTopicWriter = dmaapTopicWriters.remove(topic);
403 dmaapTopicWriter.shutdown();
407 public void destroy() {
408 List<DmaapTopicSink> writers = this.inventory();
409 for (DmaapTopicSink writer : writers) {
413 synchronized (this) {
414 this.dmaapTopicWriters.clear();
419 public DmaapTopicSink get(String topic) {
421 if (topic == null || topic.isEmpty()) {
422 throw new IllegalArgumentException(MISSING_TOPIC);
425 synchronized (this) {
426 if (dmaapTopicWriters.containsKey(topic)) {
427 return dmaapTopicWriters.get(topic);
429 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
435 public synchronized List<DmaapTopicSink> inventory() {
436 return new ArrayList<>(this.dmaapTopicWriters.values());
440 public String toString() {
441 StringBuilder builder = new StringBuilder();
442 builder.append("IndexedDmaapTopicSinkFactory []");
443 return builder.toString();