2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
29 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
30 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
31 import org.openecomp.policy.common.logging.flexlogger.Logger;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
34 * DMAAP Topic Source Factory
36 public interface DmaapTopicSourceFactory {
39 * Creates an DMAAP Topic Source based on properties files
41 * @param properties Properties containing initialization values
43 * @return an DMAAP Topic Source
44 * @throws IllegalArgumentException if invalid parameters are present
46 public List<DmaapTopicSource> build(Properties properties)
47 throws IllegalArgumentException;
50 * Instantiates a new DMAAP Topic Source
52 * @param servers list of servers
53 * @param topic topic name
54 * @param apiKey API Key
55 * @param apiSecret API Secret
56 * @param userName user name
57 * @param password password
58 * @param consumerGroup Consumer Group
59 * @param consumerInstance Consumer Instance
60 * @param fetchTimeout Read Fetch Timeout
61 * @param fetchLimit Fetch Limit
62 * @param managed is this endpoind managed?
64 * @return an DMAAP Topic Source
65 * @throws IllegalArgumentException if invalid parameters are present
67 public DmaapTopicSource build(List<String> servers,
74 String consumerInstance,
78 throws IllegalArgumentException;
81 * Instantiates a new DMAAP Topic Source
83 * @param servers list of servers
84 * @param topic topic name
85 * @param apiKey API Key
86 * @param apiSecret API Secret
88 * @return an DMAAP Topic Source
89 * @throws IllegalArgumentException if invalid parameters are present
91 public DmaapTopicSource build(List<String> servers,
95 throws IllegalArgumentException;
98 * Instantiates a new DMAAP Topic Source
100 * @param uebTopicReaderType Implementation type
101 * @param servers list of servers
102 * @param topic topic name
104 * @return an DMAAP Topic Source
105 * @throws IllegalArgumentException if invalid parameters are present
107 public DmaapTopicSource build(List<String> servers,
109 throws IllegalArgumentException;
112 * Destroys an DMAAP Topic Source based on a topic
114 * @param topic topic name
115 * @throws IllegalArgumentException if invalid parameters are present
117 public void destroy(String topic);
120 * Destroys all DMAAP Topic Sources
122 public void destroy();
125 * gets an DMAAP Topic Source based on topic name
126 * @param topic the topic name
127 * @return an DMAAP Topic Source with topic name
128 * @throws IllegalArgumentException if an invalid topic is provided
129 * @throws IllegalStateException if the DMAAP Topic Source is
132 public DmaapTopicSource get(String topic)
133 throws IllegalArgumentException, IllegalStateException;
136 * Provides a snapshot of the DMAAP Topic Sources
137 * @return a list of the DMAAP Topic Sources
139 public List<DmaapTopicSource> inventory();
143 /* ------------- implementation ----------------- */
146 * Factory of DMAAP Source Topics indexed by topic name
149 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
150 // get an instance of logger
151 private static Logger logger = FlexLogger.getLogger(IndexedDmaapTopicSourceFactory.class);
153 * UEB Topic Name Index
155 protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
156 new HashMap<String, DmaapTopicSource>();
162 public DmaapTopicSource build(List<String> servers,
168 String consumerGroup,
169 String consumerInstance,
173 throws IllegalArgumentException {
175 if (topic == null || topic.isEmpty()) {
176 throw new IllegalArgumentException("A topic must be provided");
180 if (dmaapTopicSources.containsKey(topic)) {
181 return dmaapTopicSources.get(topic);
184 DmaapTopicSource dmaapTopicSource =
185 new SingleThreadedDmaapTopicSource(servers, topic,
186 apiKey, apiSecret, userName, password,
187 consumerGroup, consumerInstance,
188 fetchTimeout, fetchLimit);
191 dmaapTopicSources.put(topic, dmaapTopicSource);
193 return dmaapTopicSource;
201 public List<DmaapTopicSource> build(Properties properties)
202 throws IllegalArgumentException {
204 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
205 if (readTopics == null || readTopics.isEmpty()) {
206 logger.warn("No topic for UEB Source " + properties);
207 return new ArrayList<DmaapTopicSource>();
209 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));
211 List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>();
213 for (String topic: readTopicList) {
215 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." +
217 PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
219 if (servers == null || servers.isEmpty()) {
220 logger.error("No UEB servers provided in " + properties);
224 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
226 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
228 PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
230 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
232 PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
234 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
236 PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
238 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
240 PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
242 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
244 PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
246 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
248 PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
250 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
252 PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
253 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
254 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
256 fetchTimeout = Integer.parseInt(fetchTimeoutString);
257 } catch (NumberFormatException nfe) {
258 logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
262 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
264 PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
265 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
266 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
268 fetchLimit = Integer.parseInt(fetchLimitString);
269 } catch (NumberFormatException nfe) {
270 logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
274 String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
276 PolicyProperties.PROPERTY_MANAGED_SUFFIX);
277 boolean managed = true;
278 if (managedString != null && !managedString.isEmpty()) {
279 managed = Boolean.parseBoolean(managedString);
282 DmaapTopicSource uebTopicSource = this.build(serverList, topic,
283 apiKey, apiSecret, aafMechId, aafPassword,
284 consumerGroup, consumerInstance,
285 fetchTimeout, fetchLimit, managed);
286 dmaapTopicSource_s.add(uebTopicSource);
289 return dmaapTopicSource_s;
296 public DmaapTopicSource build(List<String> servers,
300 return this.build(servers, topic,
301 apiKey, apiSecret, null, null,
303 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
304 DmaapTopicSource.DEFAULT_LIMIT_FETCH,
312 public DmaapTopicSource build(List<String> servers, String topic) {
313 return this.build(servers, topic, null, null);
320 public void destroy(String topic)
321 throws IllegalArgumentException {
323 if (topic == null || topic.isEmpty()) {
324 throw new IllegalArgumentException("A topic must be provided");
327 DmaapTopicSource uebTopicSource;
330 if (!dmaapTopicSources.containsKey(topic)) {
334 uebTopicSource = dmaapTopicSources.remove(topic);
337 uebTopicSource.shutdown();
344 public DmaapTopicSource get(String topic)
345 throws IllegalArgumentException, IllegalStateException {
347 if (topic == null || topic.isEmpty()) {
348 throw new IllegalArgumentException("A topic must be provided");
352 if (dmaapTopicSources.containsKey(topic)) {
353 return dmaapTopicSources.get(topic);
355 throw new IllegalArgumentException("DmaapTopicSource for " + topic + " not found");
361 public synchronized List<DmaapTopicSource> inventory() {
362 List<DmaapTopicSource> readers =
363 new ArrayList<DmaapTopicSource>(this.dmaapTopicSources.values());
368 public void destroy() {
369 List<DmaapTopicSource> readers = this.inventory();
370 for (DmaapTopicSource reader: readers) {
375 this.dmaapTopicSources.clear();