b63fe037fe30e1caa4fa8adb0881a966504c87be
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import com.google.re2j.Pattern;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28 import org.apache.commons.lang3.StringUtils;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
33 import org.onap.policy.common.endpoints.utils.PropertyUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Factory of DMAAP Source Topics indexed by topic name.
39  */
40
41 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
42     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
43     private static final String MISSING_TOPIC = "A topic must be provided";
44
45     /**
46      * Logger.
47      */
48     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
49
50     /**
51      * DMaaP Topic Name Index.
52      */
53     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
54
55     @Override
56     public DmaapTopicSource build(BusTopicParams busTopicParams) {
57
58         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
59             throw new IllegalArgumentException(MISSING_TOPIC);
60         }
61
62         synchronized (this) {
63             if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
64                 return dmaapTopicSources.get(busTopicParams.getTopic());
65             }
66
67             var dmaapTopicSource = makeSource(busTopicParams);
68
69             if (busTopicParams.isManaged()) {
70                 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
71             }
72             return dmaapTopicSource;
73         }
74     }
75
76     @Override
77     public List<DmaapTopicSource> build(Properties properties) {
78
79         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
80         if (StringUtils.isBlank(readTopics)) {
81             logger.info("{}: no topic for DMaaP Source", this);
82             return new ArrayList<>();
83         }
84
85         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
86         synchronized (this) {
87             for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
88                 addTopic(dmaapTopicSourceLst, properties, topic);
89             }
90         }
91         return dmaapTopicSourceLst;
92     }
93
94     @Override
95     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
96         return this.build(BusTopicParams.builder()
97                 .servers(servers)
98                 .topic(topic)
99                 .apiKey(apiKey)
100                 .apiSecret(apiSecret)
101                 .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
102                 .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
103                 .managed(true)
104                 .useHttps(false)
105                 .allowSelfSignedCerts(false)
106                 .build());
107     }
108
109     @Override
110     public DmaapTopicSource build(List<String> servers, String topic) {
111         return this.build(servers, topic, null, null);
112     }
113
114     private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) {
115         if (this.dmaapTopicSources.containsKey(topic)) {
116             dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
117             return;
118         }
119
120         String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic;
121
122         var props = new PropertyUtils(properties, topicPrefix,
123             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
124
125         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
126         if (StringUtils.isBlank(servers)) {
127             logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
128             return;
129         }
130
131         DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
132                 .consumerGroup(props.getString(
133                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
134                 .consumerInstance(props.getString(
135                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
136                 .fetchTimeout(props.getInteger(
137                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
138                                 PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
139                 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
140                                 PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
141                 .build());
142
143         dmaapTopicSourceLst.add(uebTopicSource);
144     }
145
146     /**
147      * Makes a new source.
148      *
149      * @param busTopicParams parameters to use to configure the source
150      * @return a new source
151      */
152     protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
153         return new SingleThreadedDmaapTopicSource(busTopicParams);
154     }
155
156     @Override
157     public void destroy(String topic) {
158
159         if (topic == null || topic.isEmpty()) {
160             throw new IllegalArgumentException(MISSING_TOPIC);
161         }
162
163         DmaapTopicSource uebTopicSource;
164
165         synchronized (this) {
166             if (!dmaapTopicSources.containsKey(topic)) {
167                 return;
168             }
169
170             uebTopicSource = dmaapTopicSources.remove(topic);
171         }
172
173         uebTopicSource.shutdown();
174     }
175
176     @Override
177     public void destroy() {
178         List<DmaapTopicSource> readers = this.inventory();
179         for (DmaapTopicSource reader : readers) {
180             reader.shutdown();
181         }
182
183         synchronized (this) {
184             this.dmaapTopicSources.clear();
185         }
186     }
187
188     @Override
189     public DmaapTopicSource get(String topic) {
190
191         if (topic == null || topic.isEmpty()) {
192             throw new IllegalArgumentException(MISSING_TOPIC);
193         }
194
195         synchronized (this) {
196             if (dmaapTopicSources.containsKey(topic)) {
197                 return dmaapTopicSources.get(topic);
198             } else {
199                 throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found");
200             }
201         }
202     }
203
204     @Override
205     public synchronized List<DmaapTopicSource> inventory() {
206         return new ArrayList<>(this.dmaapTopicSources.values());
207     }
208
209     @Override
210     public String toString() {
211         return "IndexedDmaapTopicSourceFactory []";
212     }
213
214 }