2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
29 import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
35 * DMAAP Topic Sink Factory
37 public interface DmaapTopicSinkFactory {
40 * Instantiates a new DMAAP Topic Sink
42 * @param servers list of servers
43 * @param topic topic name
44 * @param apiKey API Key
45 * @param apiSecret API Secret
46 * @param userName AAF user name
47 * @param password AAF password
48 * @param partitionKey Consumer Group
49 * @param managed is this sink endpoint managed?
51 * @return an DMAAP Topic Sink
52 * @throws IllegalArgumentException if invalid parameters are present
54 public DmaapTopicSink build(List<String> servers,
62 throws IllegalArgumentException;
65 * Creates an DMAAP Topic Sink based on properties files
67 * @param properties Properties containing initialization values
69 * @return an DMAAP Topic Sink
70 * @throws IllegalArgumentException if invalid parameters are present
72 public List<DmaapTopicSink> build(Properties properties)
73 throws IllegalArgumentException;
76 * Instantiates a new DMAAP Topic Sink
78 * @param servers list of servers
79 * @param topic topic name
81 * @return an DMAAP Topic Sink
82 * @throws IllegalArgumentException if invalid parameters are present
84 public DmaapTopicSink build(List<String> servers, String topic)
85 throws IllegalArgumentException;
88 * Destroys an DMAAP Topic Sink based on a topic
90 * @param topic topic name
91 * @throws IllegalArgumentException if invalid parameters are present
93 public void destroy(String topic);
96 * gets an DMAAP Topic Sink based on topic name
97 * @param topic the topic name
99 * @return an DMAAP Topic Sink with topic name
100 * @throws IllegalArgumentException if an invalid topic is provided
101 * @throws IllegalStateException if the DMAAP Topic Reader is
104 public DmaapTopicSink get(String topic)
105 throws IllegalArgumentException, IllegalStateException;
108 * Provides a snapshot of the DMAAP Topic Sinks
109 * @return a list of the DMAAP Topic Sinks
111 public List<DmaapTopicSink> inventory();
114 * Destroys all DMAAP Topic Sinks
116 public void destroy();
119 /* ------------- implementation ----------------- */
122 * Factory of DMAAP Reader Topics indexed by topic name
124 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
125 // get an instance of logger
126 private static Logger logger = FlexLogger.getLogger(IndexedDmaapTopicSinkFactory.class);
129 * DMAAP Topic Name Index
131 protected HashMap<String, DmaapTopicSink> dmaapTopicWriters =
132 new HashMap<String, DmaapTopicSink>();
138 public DmaapTopicSink build(List<String> servers,
146 throws IllegalArgumentException {
148 if (topic == null || topic.isEmpty()) {
149 throw new IllegalArgumentException("A topic must be provided");
152 synchronized (this) {
153 if (dmaapTopicWriters.containsKey(topic)) {
154 return dmaapTopicWriters.get(topic);
157 DmaapTopicSink dmaapTopicSink =
158 new InlineDmaapTopicSink(servers, topic,
164 dmaapTopicWriters.put(topic, dmaapTopicSink);
165 return dmaapTopicSink;
174 public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
175 return this.build(servers, topic, null, null, null, null, null, true);
183 public List<DmaapTopicSink> build(Properties properties) throws IllegalArgumentException {
185 String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS);
186 if (writeTopics == null || writeTopics.isEmpty()) {
187 logger.warn("No topic for DMAAP Sink " + properties);
188 return new ArrayList<DmaapTopicSink>();
190 List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
193 List<DmaapTopicSink> dmaapTopicWriters = new ArrayList<DmaapTopicSink>();
194 for (String topic: writeTopicList) {
196 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." +
198 PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
199 if (servers == null || servers.isEmpty()) {
200 logger.error("No DMAAP servers provided in " + properties);
204 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
206 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
208 PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
209 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
211 PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
213 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
215 PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
216 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
218 PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
220 String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
222 PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
224 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
225 PolicyProperties.PROPERTY_MANAGED_SUFFIX);
226 boolean managed = true;
227 if (managedString != null && !managedString.isEmpty()) {
228 managed = Boolean.parseBoolean(managedString);
231 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic,
232 apiKey, apiSecret, aafMechId, aafPassword,
233 partitionKey, managed);
234 dmaapTopicWriters.add(dmaapTopicSink);
236 return dmaapTopicWriters;
244 public void destroy(String topic)
245 throws IllegalArgumentException {
247 if (topic == null || topic.isEmpty()) {
248 throw new IllegalArgumentException("A topic must be provided");
251 DmaapTopicSink dmaapTopicWriter;
253 if (!dmaapTopicWriters.containsKey(topic)) {
257 dmaapTopicWriter = dmaapTopicWriters.remove(topic);
260 dmaapTopicWriter.shutdown();
267 public void destroy() {
268 List<DmaapTopicSink> writers = this.inventory();
269 for (DmaapTopicSink writer: writers) {
274 this.dmaapTopicWriters.clear();
282 public DmaapTopicSink get(String topic)
283 throws IllegalArgumentException, IllegalStateException {
285 if (topic == null || topic.isEmpty()) {
286 throw new IllegalArgumentException("A topic must be provided");
290 if (dmaapTopicWriters.containsKey(topic)) {
291 return dmaapTopicWriters.get(topic);
293 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
302 public synchronized List<DmaapTopicSink> inventory() {
303 List<DmaapTopicSink> writers =
304 new ArrayList<DmaapTopicSink>(this.dmaapTopicWriters.values());