2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import com.google.re2j.Pattern;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28 import org.apache.commons.lang3.StringUtils;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
33 import org.onap.policy.common.endpoints.utils.PropertyUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Factory of DMAAP Reader Topics indexed by topic name.
40 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
42 private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
43 private static final String MISSING_TOPIC = "A topic must be provided";
48 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
51 * DMAAP Topic Name Index.
53 protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
56 public DmaapTopicSink build(BusTopicParams busTopicParams) {
58 if (StringUtils.isBlank(busTopicParams.getTopic())) {
59 throw new IllegalArgumentException(MISSING_TOPIC);
63 if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
64 return dmaapTopicWriters.get(busTopicParams.getTopic());
67 var dmaapTopicSink = makeSink(busTopicParams);
69 if (busTopicParams.isManaged()) {
70 dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
72 return dmaapTopicSink;
77 public DmaapTopicSink build(List<String> servers, String topic) {
78 return this.build(BusTopicParams.builder()
83 .allowSelfSignedCerts(false)
88 public List<DmaapTopicSink> build(Properties properties) {
90 String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
91 if (StringUtils.isBlank(writeTopics)) {
92 logger.info("{}: no topic for DMaaP Sink", this);
93 return new ArrayList<>();
96 List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
98 for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
99 addTopic(newDmaapTopicSinks, properties, topic);
101 return newDmaapTopicSinks;
105 private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) {
106 if (this.dmaapTopicWriters.containsKey(topic)) {
107 newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
111 String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic;
113 var props = new PropertyUtils(properties, topicPrefix,
114 (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
116 String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
117 if (StringUtils.isBlank(servers)) {
118 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
122 var dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
123 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
126 newDmaapTopicSinks.add(dmaapTopicSink);
132 * @param busTopicParams parameters to use to configure the sink
135 protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
136 return new InlineDmaapTopicSink(busTopicParams);
140 public void destroy(String topic) {
142 if (topic == null || topic.isEmpty()) {
143 throw new IllegalArgumentException(MISSING_TOPIC);
146 DmaapTopicSink dmaapTopicWriter;
147 synchronized (this) {
148 if (!dmaapTopicWriters.containsKey(topic)) {
152 dmaapTopicWriter = dmaapTopicWriters.remove(topic);
155 dmaapTopicWriter.shutdown();
159 public void destroy() {
160 List<DmaapTopicSink> writers = this.inventory();
161 for (DmaapTopicSink writer : writers) {
165 synchronized (this) {
166 this.dmaapTopicWriters.clear();
171 public DmaapTopicSink get(String topic) {
173 if (topic == null || topic.isEmpty()) {
174 throw new IllegalArgumentException(MISSING_TOPIC);
177 synchronized (this) {
178 if (dmaapTopicWriters.containsKey(topic)) {
179 return dmaapTopicWriters.get(topic);
181 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
187 public synchronized List<DmaapTopicSink> inventory() {
188 return new ArrayList<>(this.dmaapTopicWriters.values());
192 public String toString() {
193 return "IndexedDmaapTopicSinkFactory []";