c895a409f5ce6b06998e596c4e4558c8d6b0288e
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import org.apache.commons.lang3.StringUtils;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
30 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
31 import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
32 import org.onap.policy.common.endpoints.utils.PropertyUtils;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of DMAAP Source Topics indexed by topic name.
38  */
39
40 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
41     private static final String MISSING_TOPIC = "A topic must be provided";
42
43     /**
44      * Logger.
45      */
46     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
47
48     /**
49      * DMaaP Topic Name Index.
50      */
51     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
52
53     @Override
54     public DmaapTopicSource build(BusTopicParams busTopicParams) {
55
56         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
57             throw new IllegalArgumentException(MISSING_TOPIC);
58         }
59
60         synchronized (this) {
61             if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
62                 return dmaapTopicSources.get(busTopicParams.getTopic());
63             }
64
65             DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams);
66
67             if (busTopicParams.isManaged()) {
68                 dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
69             }
70             return dmaapTopicSource;
71         }
72     }
73
74     @Override
75     public List<DmaapTopicSource> build(Properties properties) {
76
77         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
78         if (StringUtils.isBlank(readTopics)) {
79             logger.info("{}: no topic for DMaaP Source", this);
80             return new ArrayList<>();
81         }
82
83         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
84         synchronized (this) {
85             for (String topic : readTopics.split("\\s*,\\s*")) {
86                 addTopic(dmaapTopicSourceLst, properties, topic);
87             }
88         }
89         return dmaapTopicSourceLst;
90     }
91
92     @Override
93     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
94         return this.build(BusTopicParams.builder()
95                 .servers(servers)
96                 .topic(topic)
97                 .apiKey(apiKey)
98                 .apiSecret(apiSecret)
99                 .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
100                 .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
101                 .managed(true)
102                 .useHttps(false)
103                 .allowSelfSignedCerts(false)
104                 .build());
105     }
106
107     @Override
108     public DmaapTopicSource build(List<String> servers, String topic) {
109         return this.build(servers, topic, null, null);
110     }
111
112     private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) {
113         if (this.dmaapTopicSources.containsKey(topic)) {
114             dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
115             return;
116         }
117
118         String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic;
119
120         PropertyUtils props = new PropertyUtils(properties, topicPrefix,
121             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
122
123         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
124         if (StringUtils.isBlank(servers)) {
125             logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
126             return;
127         }
128
129         DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
130                 .consumerGroup(props.getString(
131                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
132                 .consumerInstance(props.getString(
133                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
134                 .fetchTimeout(props.getInteger(
135                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
136                                 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH))
137                 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
138                                 DmaapTopicSource.DEFAULT_LIMIT_FETCH))
139                 .build());
140
141         dmaapTopicSourceLst.add(uebTopicSource);
142     }
143
144     /**
145      * Makes a new source.
146      *
147      * @param busTopicParams parameters to use to configure the source
148      * @return a new source
149      */
150     protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
151         return new SingleThreadedDmaapTopicSource(busTopicParams);
152     }
153
154     @Override
155     public void destroy(String topic) {
156
157         if (topic == null || topic.isEmpty()) {
158             throw new IllegalArgumentException(MISSING_TOPIC);
159         }
160
161         DmaapTopicSource uebTopicSource;
162
163         synchronized (this) {
164             if (!dmaapTopicSources.containsKey(topic)) {
165                 return;
166             }
167
168             uebTopicSource = dmaapTopicSources.remove(topic);
169         }
170
171         uebTopicSource.shutdown();
172     }
173
174     @Override
175     public void destroy() {
176         List<DmaapTopicSource> readers = this.inventory();
177         for (DmaapTopicSource reader : readers) {
178             reader.shutdown();
179         }
180
181         synchronized (this) {
182             this.dmaapTopicSources.clear();
183         }
184     }
185
186     @Override
187     public DmaapTopicSource get(String topic) {
188
189         if (topic == null || topic.isEmpty()) {
190             throw new IllegalArgumentException(MISSING_TOPIC);
191         }
192
193         synchronized (this) {
194             if (dmaapTopicSources.containsKey(topic)) {
195                 return dmaapTopicSources.get(topic);
196             } else {
197                 throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found");
198             }
199         }
200     }
201
202     @Override
203     public synchronized List<DmaapTopicSource> inventory() {
204         return new ArrayList<>(this.dmaapTopicSources.values());
205     }
206
207     @Override
208     public String toString() {
209         return "IndexedDmaapTopicSourceFactory []";
210     }
211
212 }