779ab74ddc12280b831e2ee1e5c202a6e005c436
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.impl;
22
23 import com.fasterxml.jackson.annotation.JsonIgnore;
24
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Properties;
28
29 import org.onap.policy.common.capabilities.Startable;
30 import org.onap.policy.common.endpoints.event.comm.Topic;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
32 import org.onap.policy.common.endpoints.event.comm.TopicSink;
33 import org.onap.policy.common.endpoints.event.comm.TopicSource;
34 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
35 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
36 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
37 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
38 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
39 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedDmaapTopicSinkFactory;
40 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedDmaapTopicSourceFactory;
41 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedNoopTopicSinkFactory;
42 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedUebTopicSinkFactory;
43 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedUebTopicSourceFactory;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * This implementation of the Topic Endpoint Manager, proxies operations to appropriate
49  * implementations according to the communication infrastructure that are supported
50  */
51 public class ProxyTopicEndpointManager implements TopicEndpoint {
52     /**
53      * Logger
54      */
55     private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
56     /**
57      * Is this element locked?
58      */
59     protected volatile boolean locked = false;
60
61     /**
62      * Is this element alive?
63      */
64     protected volatile boolean alive = false;
65
66     /**
67      * singleton for global access
68      */
69     private static final TopicEndpoint manager = new ProxyTopicEndpointManager();
70
71     /**
72      * Get the singelton instance.
73      * 
74      * @return the instance
75      */
76     public static TopicEndpoint getInstance() {
77         return manager;
78     }
79
80     @Override
81     public List<TopicSource> addTopicSources(Properties properties) {
82
83         // 1. Create UEB Sources
84         // 2. Create DMAAP Sources
85
86         final List<TopicSource> sources = new ArrayList<>();
87
88         sources.addAll(IndexedUebTopicSourceFactory.getInstance().build(properties));
89         sources.addAll(IndexedDmaapTopicSourceFactory.getInstance().build(properties));
90
91         if (this.isLocked()) {
92             for (final TopicSource source : sources) {
93                 source.lock();
94             }
95         }
96
97         return sources;
98     }
99
100     @Override
101     public List<TopicSink> addTopicSinks(Properties properties) {
102         // 1. Create UEB Sinks
103         // 2. Create DMAAP Sinks
104
105         final List<TopicSink> sinks = new ArrayList<>();
106
107         sinks.addAll(IndexedUebTopicSinkFactory.getInstance().build(properties));
108         sinks.addAll(IndexedDmaapTopicSinkFactory.getInstance().build(properties));
109         sinks.addAll(IndexedNoopTopicSinkFactory.getInstance().build(properties));
110
111         if (this.isLocked()) {
112             for (final TopicSink sink : sinks) {
113                 sink.lock();
114             }
115         }
116
117         return sinks;
118     }
119
120     @Override
121     public List<TopicSource> getTopicSources() {
122
123         final List<TopicSource> sources = new ArrayList<>();
124
125         sources.addAll(IndexedUebTopicSourceFactory.getInstance().inventory());
126         sources.addAll(IndexedDmaapTopicSourceFactory.getInstance().inventory());
127
128         return sources;
129     }
130
131     @Override
132     public List<TopicSink> getTopicSinks() {
133
134         final List<TopicSink> sinks = new ArrayList<>();
135
136         sinks.addAll(IndexedUebTopicSinkFactory.getInstance().inventory());
137         sinks.addAll(IndexedDmaapTopicSinkFactory.getInstance().inventory());
138         sinks.addAll(IndexedNoopTopicSinkFactory.getInstance().inventory());
139
140         return sinks;
141     }
142
143     @JsonIgnore
144     @Override
145     public List<UebTopicSource> getUebTopicSources() {
146         return IndexedUebTopicSourceFactory.getInstance().inventory();
147     }
148
149     @JsonIgnore
150     @Override
151     public List<DmaapTopicSource> getDmaapTopicSources() {
152         return IndexedDmaapTopicSourceFactory.getInstance().inventory();
153     }
154
155     @JsonIgnore
156     @Override
157     public List<UebTopicSink> getUebTopicSinks() {
158         return IndexedUebTopicSinkFactory.getInstance().inventory();
159     }
160
161     @JsonIgnore
162     @Override
163     public List<DmaapTopicSink> getDmaapTopicSinks() {
164         return IndexedDmaapTopicSinkFactory.getInstance().inventory();
165     }
166
167     @JsonIgnore
168     @Override
169     public List<NoopTopicSink> getNoopTopicSinks() {
170         return IndexedNoopTopicSinkFactory.getInstance().inventory();
171     }
172
173     @Override
174     public boolean start() {
175
176         synchronized (this) {
177             if (this.locked) {
178                 throw new IllegalStateException(this + " is locked");
179             }
180
181             if (this.alive) {
182                 return true;
183             }
184
185             this.alive = true;
186         }
187
188         final List<Startable> endpoints = this.getEndpoints();
189
190         boolean success = true;
191         for (final Startable endpoint : endpoints) {
192             try {
193                 success = endpoint.start() && success;
194             } catch (final Exception e) {
195                 success = false;
196                 logger.error("Problem starting endpoint: {}", endpoint, e);
197             }
198         }
199
200         return success;
201     }
202
203
204     @Override
205     public boolean stop() {
206
207         /*
208          * stop regardless if it is locked, in other words, stop operation has precedence over
209          * locks.
210          */
211         synchronized (this) {
212             this.alive = false;
213         }
214
215         final List<Startable> endpoints = this.getEndpoints();
216
217         boolean success = true;
218         for (final Startable endpoint : endpoints) {
219             try {
220                 success = endpoint.stop() && success;
221             } catch (final Exception e) {
222                 success = false;
223                 logger.error("Problem stopping endpoint: {}", endpoint, e);
224             }
225         }
226
227         return success;
228     }
229
230     /**
231      *
232      * @return list of managed endpoints
233      */
234     @JsonIgnore
235     protected List<Startable> getEndpoints() {
236         final List<Startable> endpoints = new ArrayList<>();
237
238         endpoints.addAll(this.getTopicSources());
239         endpoints.addAll(this.getTopicSinks());
240
241         return endpoints;
242     }
243
244     @Override
245     public void shutdown() {
246         IndexedUebTopicSourceFactory.getInstance().destroy();
247         IndexedUebTopicSinkFactory.getInstance().destroy();
248         IndexedNoopTopicSinkFactory.getInstance().destroy();
249
250         IndexedDmaapTopicSourceFactory.getInstance().destroy();
251         IndexedDmaapTopicSinkFactory.getInstance().destroy();
252     }
253
254     @Override
255     public boolean isAlive() {
256         return this.alive;
257     }
258
259     @Override
260     public boolean lock() {
261
262         synchronized (this) {
263             if (this.locked) {
264                 return true;
265             }
266
267             this.locked = true;
268         }
269
270         for (final TopicSource source : this.getTopicSources()) {
271             source.lock();
272         }
273
274         for (final TopicSink sink : this.getTopicSinks()) {
275             sink.lock();
276         }
277
278         return true;
279     }
280
281     @Override
282     public boolean unlock() {
283         synchronized (this) {
284             if (!this.locked) {
285                 return true;
286             }
287
288             this.locked = false;
289         }
290
291         for (final TopicSource source : this.getTopicSources()) {
292             source.unlock();
293         }
294
295         for (final TopicSink sink : this.getTopicSinks()) {
296             sink.unlock();
297         }
298
299         return true;
300     }
301
302     @Override
303     public boolean isLocked() {
304         return this.locked;
305     }
306
307     @Override
308     public List<TopicSource> getTopicSources(List<String> topicNames) {
309
310         if (topicNames == null) {
311             throw new IllegalArgumentException("must provide a list of topics");
312         }
313
314         final List<TopicSource> sources = new ArrayList<>();
315         for (final String topic : topicNames) {
316             try {
317                 final TopicSource uebSource = this.getUebTopicSource(topic);
318                 if (uebSource != null) {
319                     sources.add(uebSource);
320                 }
321             } catch (final Exception e) {
322                 logger.debug("No UEB source for topic: {}", topic, e);
323             }
324
325             try {
326                 final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
327                 if (dmaapSource != null) {
328                     sources.add(dmaapSource);
329                 }
330             } catch (final Exception e) {
331                 logger.debug("No DMAAP source for topic: {}", topic, e);
332             }
333         }
334         return sources;
335     }
336
337     @Override
338     public List<TopicSink> getTopicSinks(List<String> topicNames) {
339
340         if (topicNames == null) {
341             throw new IllegalArgumentException("must provide a list of topics");
342         }
343
344         final List<TopicSink> sinks = new ArrayList<>();
345         for (final String topic : topicNames) {
346             try {
347                 final TopicSink uebSink = this.getUebTopicSink(topic);
348                 if (uebSink != null) {
349                     sinks.add(uebSink);
350                 }
351             } catch (final Exception e) {
352                 logger.debug("No UEB sink for topic: {}", topic, e);
353             }
354
355             try {
356                 final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
357                 if (dmaapSink != null) {
358                     sinks.add(dmaapSink);
359                 }
360             } catch (final Exception e) {
361                 logger.debug("No DMAAP sink for topic: {}", topic, e);
362             }
363
364             try {
365                 final TopicSink noopSink = this.getNoopTopicSink(topic);
366                 if (noopSink != null) {
367                     sinks.add(noopSink);
368                 }
369             } catch (final Exception e) {
370                 logger.debug("No NOOP sink for topic: {}", topic, e);
371             }
372         }
373         return sinks;
374     }
375
376     @Override
377     public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) {
378
379         if (commType == null) {
380             throw parmException(topicName);
381         }
382
383         if (topicName == null) {
384             throw parmException(topicName);
385         }
386
387         switch (commType) {
388             case UEB:
389                 return this.getUebTopicSource(topicName);
390             case DMAAP:
391                 return this.getDmaapTopicSource(topicName);
392             default:
393                 throw new UnsupportedOperationException("Unsupported " + commType.name());
394         }
395     }
396
397     private IllegalArgumentException parmException(String topicName) {
398         return new IllegalArgumentException(
399                 "Invalid parameter: a communication infrastructure required to fetch " + topicName);
400     }
401
402     @Override
403     public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) {
404         if (commType == null) {
405             throw parmException(topicName);
406         }
407
408         if (topicName == null) {
409             throw parmException(topicName);
410         }
411
412         switch (commType) {
413             case UEB:
414                 return this.getUebTopicSink(topicName);
415             case DMAAP:
416                 return this.getDmaapTopicSink(topicName);
417             case NOOP:
418                 return this.getNoopTopicSink(topicName);
419             default:
420                 throw new UnsupportedOperationException("Unsupported " + commType.name());
421         }
422     }
423
424     @Override
425     public List<TopicSink> getTopicSinks(String topicName) {
426         if (topicName == null) {
427             throw parmException(topicName);
428         }
429
430         final List<TopicSink> sinks = new ArrayList<>();
431
432         try {
433             sinks.add(this.getUebTopicSink(topicName));
434         } catch (final Exception e) {
435             logNoSink(topicName, e);
436         }
437
438         try {
439             sinks.add(this.getDmaapTopicSink(topicName));
440         } catch (final Exception e) {
441             logNoSink(topicName, e);
442         }
443
444         try {
445             sinks.add(this.getNoopTopicSink(topicName));
446         } catch (final Exception e) {
447             logNoSink(topicName, e);
448         }
449
450         return sinks;
451     }
452
453     private void logNoSink(String topicName, Exception ex) {
454         logger.debug("No sink for topic: {}", topicName, ex);
455     }
456
457     @Override
458     public UebTopicSource getUebTopicSource(String topicName) {
459         return IndexedUebTopicSourceFactory.getInstance().get(topicName);
460     }
461
462     @Override
463     public UebTopicSink getUebTopicSink(String topicName) {
464         return IndexedUebTopicSinkFactory.getInstance().get(topicName);
465     }
466
467     @Override
468     public DmaapTopicSource getDmaapTopicSource(String topicName) {
469         return IndexedDmaapTopicSourceFactory.getInstance().get(topicName);
470     }
471
472     @Override
473     public DmaapTopicSink getDmaapTopicSink(String topicName) {
474         return IndexedDmaapTopicSinkFactory.getInstance().get(topicName);
475     }
476
477     @Override
478     public NoopTopicSink getNoopTopicSink(String topicName) {
479         return IndexedNoopTopicSinkFactory.getInstance().get(topicName);
480     }
481
482 }