ee1672d797d3d04633bf8d0559d0d05d8836ecfb
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Noop Topic Sink Factory
35  */
36 public interface NoopTopicSinkFactory {
37
38     /**
39      * Creates noop topic sinks based on properties files
40      * 
41      * @param properties Properties containing initialization values
42      * 
43      * @return a noop topic sink
44      * @throws IllegalArgumentException if invalid parameters are present
45      */
46     public List<NoopTopicSink> build(Properties properties);
47
48     /**
49      * builds a noop sink
50      * 
51      * @param servers list of servers
52      * @param topic topic name
53      * @param managed is this sink endpoint managed?
54      * @return a noop topic sink
55      * @throws IllegalArgumentException if invalid parameters are present
56      */
57     public NoopTopicSink build(List<String> servers, String topic, boolean managed);
58
59     /**
60      * Destroys a sink based on the topic
61      * 
62      * @param topic topic name
63      * @throws IllegalArgumentException if invalid parameters are present
64      */
65     public void destroy(String topic);
66
67     /**
68      * gets a sink based on topic name
69      * 
70      * @param topic the topic name
71      * 
72      * @return a sink with topic name
73      * @throws IllegalArgumentException if an invalid topic is provided
74      * @throws IllegalStateException if the sink is in an incorrect state
75      */
76     public NoopTopicSink get(String topic);
77
78     /**
79      * Provides a snapshot of the UEB Topic Writers
80      * 
81      * @return a list of the UEB Topic Writers
82      */
83     public List<NoopTopicSink> inventory();
84
85     /**
86      * Destroys all sinks
87      */
88     public void destroy();
89 }
90
91
92 /* ------------- implementation ----------------- */
93
94 /**
95  * Factory of noop sinks
96  */
97 class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
98     private static final String MISSING_TOPIC = "A topic must be provided";
99
100     /**
101      * Logger
102      */
103     private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
104
105     /**
106      * noop topic sinks map
107      */
108     protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
109
110     @Override
111     public List<NoopTopicSink> build(Properties properties) {
112
113         final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS);
114         if (sinkTopics == null || sinkTopics.isEmpty()) {
115             logger.info("{}: no topic for noop sink", this);
116             return new ArrayList<>();
117         }
118
119         final List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
120         final List<NoopTopicSink> newSinks = new ArrayList<>();
121         synchronized (this) {
122             for (final String topic : sinkTopicList) {
123                 if (this.noopTopicSinks.containsKey(topic)) {
124                     newSinks.add(this.noopTopicSinks.get(topic));
125                     continue;
126                 }
127
128                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic
129                         + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
130
131                 if (servers == null || servers.isEmpty()) {
132                     servers = "noop";
133                 }
134
135                 final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
136
137                 final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS
138                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
139                 boolean managed = true;
140                 if (managedString != null && !managedString.isEmpty()) {
141                     managed = Boolean.parseBoolean(managedString);
142                 }
143
144                 final NoopTopicSink noopSink = this.build(serverList, topic, managed);
145                 newSinks.add(noopSink);
146             }
147             return newSinks;
148         }
149     }
150
151     @Override
152     public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
153
154         List<String> noopSinkServers = servers;
155         if (noopSinkServers == null) {
156             noopSinkServers = new ArrayList<>();
157         }
158
159         if (noopSinkServers.isEmpty()) {
160             noopSinkServers.add("noop");
161         }
162
163         if (topic == null || topic.isEmpty()) {
164             throw new IllegalArgumentException(MISSING_TOPIC);
165         }
166
167         synchronized (this) {
168             if (this.noopTopicSinks.containsKey(topic)) {
169                 return this.noopTopicSinks.get(topic);
170             }
171
172             final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic);
173
174             if (managed) {
175                 this.noopTopicSinks.put(topic, sink);
176             }
177
178             return sink;
179         }
180     }
181
182     @Override
183     public void destroy(String topic) {
184         if (topic == null || topic.isEmpty()) {
185             throw new IllegalArgumentException(MISSING_TOPIC);
186         }
187
188         NoopTopicSink noopSink;
189         synchronized (this) {
190             if (!this.noopTopicSinks.containsKey(topic)) {
191                 return;
192             }
193
194             noopSink = this.noopTopicSinks.remove(topic);
195         }
196
197         noopSink.shutdown();
198     }
199
200     @Override
201     public void destroy() {
202         final List<NoopTopicSink> sinks = this.inventory();
203         for (final NoopTopicSink sink : sinks) {
204             sink.shutdown();
205         }
206
207         synchronized (this) {
208             this.noopTopicSinks.clear();
209         }
210     }
211
212     @Override
213     public NoopTopicSink get(String topic) {
214         if (topic == null || topic.isEmpty()) {
215             throw new IllegalArgumentException(MISSING_TOPIC);
216         }
217
218         synchronized (this) {
219             if (this.noopTopicSinks.containsKey(topic)) {
220                 return this.noopTopicSinks.get(topic);
221             } else {
222                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
223             }
224         }
225     }
226
227     @Override
228     public List<NoopTopicSink> inventory() {
229         return new ArrayList<>(this.noopTopicSinks.values());
230     }
231 }