2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.regex.Pattern;
28 import org.apache.commons.lang3.StringUtils;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
33 import org.onap.policy.common.endpoints.utils.PropertyUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Factory of DMAAP Source Topics indexed by topic name.
41 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
42 private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
43 private static final String MISSING_TOPIC = "A topic must be provided";
48 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
51 * DMaaP Topic Name Index.
53 protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
56 public DmaapTopicSource build(BusTopicParams busTopicParams) {
58 if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
59 throw new IllegalArgumentException(MISSING_TOPIC);
63 if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
64 return dmaapTopicSources.get(busTopicParams.getTopic());
67 var dmaapTopicSource = makeSource(busTopicParams);
69 if (busTopicParams.isManaged()) {
70 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
72 return dmaapTopicSource;
77 public List<DmaapTopicSource> build(Properties properties) {
79 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
80 if (StringUtils.isBlank(readTopics)) {
81 logger.info("{}: no topic for DMaaP Source", this);
82 return new ArrayList<>();
85 List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
87 for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
88 addTopic(dmaapTopicSourceLst, properties, topic);
91 return dmaapTopicSourceLst;
95 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
96 return this.build(BusTopicParams.builder()
100 .apiSecret(apiSecret)
101 .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
102 .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
105 .allowSelfSignedCerts(false)
110 public DmaapTopicSource build(List<String> servers, String topic) {
111 return this.build(servers, topic, null, null);
114 private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) {
115 if (this.dmaapTopicSources.containsKey(topic)) {
116 dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
120 String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic;
122 var props = new PropertyUtils(properties, topicPrefix,
123 (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
125 String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
126 if (StringUtils.isBlank(servers)) {
127 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
131 DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
132 .consumerGroup(props.getString(
133 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
134 .consumerInstance(props.getString(
135 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
136 .fetchTimeout(props.getInteger(
137 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
138 PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
139 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
140 PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
143 dmaapTopicSourceLst.add(uebTopicSource);
147 * Makes a new source.
149 * @param busTopicParams parameters to use to configure the source
150 * @return a new source
152 protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
153 return new SingleThreadedDmaapTopicSource(busTopicParams);
157 public void destroy(String topic) {
159 if (topic == null || topic.isEmpty()) {
160 throw new IllegalArgumentException(MISSING_TOPIC);
163 DmaapTopicSource uebTopicSource;
165 synchronized (this) {
166 if (!dmaapTopicSources.containsKey(topic)) {
170 uebTopicSource = dmaapTopicSources.remove(topic);
173 uebTopicSource.shutdown();
177 public void destroy() {
178 List<DmaapTopicSource> readers = this.inventory();
179 for (DmaapTopicSource reader : readers) {
183 synchronized (this) {
184 this.dmaapTopicSources.clear();
189 public DmaapTopicSource get(String topic) {
191 if (topic == null || topic.isEmpty()) {
192 throw new IllegalArgumentException(MISSING_TOPIC);
195 synchronized (this) {
196 if (dmaapTopicSources.containsKey(topic)) {
197 return dmaapTopicSources.get(topic);
199 throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found");
205 public synchronized List<DmaapTopicSource> inventory() {
206 return new ArrayList<>(this.dmaapTopicSources.values());
210 public String toString() {
211 return "IndexedDmaapTopicSourceFactory []";