5bdc8ab6f0bbdd906c092ad9751b46104df782e2
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import org.apache.commons.lang3.StringUtils;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
30 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
31 import org.onap.policy.common.endpoints.utils.PropertyUtils;
32 import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Factory of UEB Source Topics indexed by topic name.
38  */
39 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
40     private static final String MISSING_TOPIC = "A topic must be provided";
41
42     /**
43      * Logger.
44      */
45     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
46
47     /**
48      * UEB Topic Name Index.
49      */
50     protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
51
52     @Override
53     public UebTopicSource build(BusTopicParams busTopicParams) {
54         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
55             throw new IllegalArgumentException("UEB Server(s) must be provided");
56         }
57
58         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
59             throw new IllegalArgumentException(MISSING_TOPIC);
60         }
61
62         synchronized (this) {
63             if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
64                 return uebTopicSources.get(busTopicParams.getTopic());
65             }
66
67             UebTopicSource uebTopicSource = makeSource(busTopicParams);
68
69             if (busTopicParams.isManaged()) {
70                 uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
71             }
72
73             return uebTopicSource;
74         }
75     }
76
77     @Override
78     public List<UebTopicSource> build(Properties properties) {
79
80         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
81         if (StringUtils.isBlank(readTopics)) {
82             logger.info("{}: no topic for UEB Source", this);
83             return new ArrayList<>();
84         }
85
86         List<UebTopicSource> newUebTopicSources = new ArrayList<>();
87         synchronized (this) {
88             for (String topic : readTopics.split("\\s*,\\s*")) {
89                 addTopic(newUebTopicSources, topic, properties);
90             }
91         }
92         return newUebTopicSources;
93     }
94
95     @Override
96     public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
97
98         return this.build(BusTopicParams.builder()
99                 .servers(servers)
100                 .topic(topic)
101                 .apiKey(apiKey)
102                 .apiSecret(apiSecret)
103                 .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
104                 .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
105                 .managed(true)
106                 .useHttps(false)
107                 .allowSelfSignedCerts(true).build());
108     }
109
110     @Override
111     public UebTopicSource build(List<String> servers, String topic) {
112         return this.build(servers, topic, null, null);
113     }
114
115     private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) {
116         if (this.uebTopicSources.containsKey(topic)) {
117             newUebTopicSources.add(this.uebTopicSources.get(topic));
118             return;
119         }
120
121         String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic;
122
123         PropertyUtils props = new PropertyUtils(properties, topicPrefix,
124             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
125
126         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
127         if (StringUtils.isBlank(servers)) {
128             logger.error("{}: no UEB servers configured for sink {}", this, topic);
129             return;
130         }
131
132         UebTopicSource uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
133                 .consumerGroup(props.getString(
134                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
135                 .consumerInstance(props.getString(
136                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
137                 .fetchTimeout(props.getInteger(
138                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
139                                 PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
140                 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
141                                 PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
142                 .build());
143
144         newUebTopicSources.add(uebTopicSource);
145     }
146
147     /**
148      * Makes a new source.
149      *
150      * @param busTopicParams parameters to use to configure the source
151      * @return a new source
152      */
153     protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
154         return new SingleThreadedUebTopicSource(busTopicParams);
155     }
156
157     @Override
158     public void destroy(String topic) {
159
160         if (topic == null || topic.isEmpty()) {
161             throw new IllegalArgumentException(MISSING_TOPIC);
162         }
163
164         UebTopicSource uebTopicSource;
165
166         synchronized (this) {
167             if (!uebTopicSources.containsKey(topic)) {
168                 return;
169             }
170
171             uebTopicSource = uebTopicSources.remove(topic);
172         }
173
174         uebTopicSource.shutdown();
175     }
176
177     @Override
178     public void destroy() {
179         List<UebTopicSource> readers = this.inventory();
180         for (UebTopicSource reader : readers) {
181             reader.shutdown();
182         }
183
184         synchronized (this) {
185             this.uebTopicSources.clear();
186         }
187     }
188
189     @Override
190     public UebTopicSource get(String topic) {
191
192         if (topic == null || topic.isEmpty()) {
193             throw new IllegalArgumentException(MISSING_TOPIC);
194         }
195
196         synchronized (this) {
197             if (uebTopicSources.containsKey(topic)) {
198                 return uebTopicSources.get(topic);
199             } else {
200                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
201             }
202         }
203     }
204
205     @Override
206     public synchronized List<UebTopicSource> inventory() {
207         return new ArrayList<>(this.uebTopicSources.values());
208     }
209
210     @Override
211     public String toString() {
212         StringBuilder builder = new StringBuilder();
213         builder.append("IndexedUebTopicSourceFactory []");
214         return builder.toString();
215     }
216 }