2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.impl;
23 import com.fasterxml.jackson.annotation.JsonIgnore;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Properties;
29 import org.onap.policy.common.capabilities.Startable;
30 import org.onap.policy.common.endpoints.event.comm.Topic;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
32 import org.onap.policy.common.endpoints.event.comm.TopicSink;
33 import org.onap.policy.common.endpoints.event.comm.TopicSource;
34 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
35 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
36 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
37 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
38 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
39 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedDmaapTopicSinkFactory;
40 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedDmaapTopicSourceFactory;
41 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedNoopTopicSinkFactory;
42 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedUebTopicSinkFactory;
43 import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedUebTopicSourceFactory;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * This implementation of the Topic Endpoint Manager, proxies operations to appropriate
49 * implementations according to the communication infrastructure that are supported
51 public class ProxyTopicEndpointManager implements TopicEndpoint {
55 private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
57 * Is this element locked?
59 protected volatile boolean locked = false;
62 * Is this element alive?
64 protected volatile boolean alive = false;
67 * singleton for global access
69 private static final TopicEndpoint manager = new ProxyTopicEndpointManager();
72 * Get the singelton instance.
74 * @return the instance
76 public static TopicEndpoint getInstance() {
81 public List<TopicSource> addTopicSources(Properties properties) {
83 // 1. Create UEB Sources
84 // 2. Create DMAAP Sources
86 final List<TopicSource> sources = new ArrayList<>();
88 sources.addAll(IndexedUebTopicSourceFactory.getInstance().build(properties));
89 sources.addAll(IndexedDmaapTopicSourceFactory.getInstance().build(properties));
91 if (this.isLocked()) {
92 for (final TopicSource source : sources) {
101 public List<TopicSink> addTopicSinks(Properties properties) {
102 // 1. Create UEB Sinks
103 // 2. Create DMAAP Sinks
105 final List<TopicSink> sinks = new ArrayList<>();
107 sinks.addAll(IndexedUebTopicSinkFactory.getInstance().build(properties));
108 sinks.addAll(IndexedDmaapTopicSinkFactory.getInstance().build(properties));
109 sinks.addAll(IndexedNoopTopicSinkFactory.getInstance().build(properties));
111 if (this.isLocked()) {
112 for (final TopicSink sink : sinks) {
121 public List<TopicSource> getTopicSources() {
123 final List<TopicSource> sources = new ArrayList<>();
125 sources.addAll(IndexedUebTopicSourceFactory.getInstance().inventory());
126 sources.addAll(IndexedDmaapTopicSourceFactory.getInstance().inventory());
132 public List<TopicSink> getTopicSinks() {
134 final List<TopicSink> sinks = new ArrayList<>();
136 sinks.addAll(IndexedUebTopicSinkFactory.getInstance().inventory());
137 sinks.addAll(IndexedDmaapTopicSinkFactory.getInstance().inventory());
138 sinks.addAll(IndexedNoopTopicSinkFactory.getInstance().inventory());
145 public List<UebTopicSource> getUebTopicSources() {
146 return IndexedUebTopicSourceFactory.getInstance().inventory();
151 public List<DmaapTopicSource> getDmaapTopicSources() {
152 return IndexedDmaapTopicSourceFactory.getInstance().inventory();
157 public List<UebTopicSink> getUebTopicSinks() {
158 return IndexedUebTopicSinkFactory.getInstance().inventory();
163 public List<DmaapTopicSink> getDmaapTopicSinks() {
164 return IndexedDmaapTopicSinkFactory.getInstance().inventory();
169 public List<NoopTopicSink> getNoopTopicSinks() {
170 return IndexedNoopTopicSinkFactory.getInstance().inventory();
174 public boolean start() {
176 synchronized (this) {
178 throw new IllegalStateException(this + " is locked");
188 final List<Startable> endpoints = this.getEndpoints();
190 boolean success = true;
191 for (final Startable endpoint : endpoints) {
193 success = endpoint.start() && success;
194 } catch (final Exception e) {
196 logger.error("Problem starting endpoint: {}", endpoint, e);
205 public boolean stop() {
208 * stop regardless if it is locked, in other words, stop operation has precedence over
211 synchronized (this) {
215 final List<Startable> endpoints = this.getEndpoints();
217 boolean success = true;
218 for (final Startable endpoint : endpoints) {
220 success = endpoint.stop() && success;
221 } catch (final Exception e) {
223 logger.error("Problem stopping endpoint: {}", endpoint, e);
232 * @return list of managed endpoints
235 protected List<Startable> getEndpoints() {
236 final List<Startable> endpoints = new ArrayList<>();
238 endpoints.addAll(this.getTopicSources());
239 endpoints.addAll(this.getTopicSinks());
245 public void shutdown() {
246 IndexedUebTopicSourceFactory.getInstance().destroy();
247 IndexedUebTopicSinkFactory.getInstance().destroy();
248 IndexedNoopTopicSinkFactory.getInstance().destroy();
250 IndexedDmaapTopicSourceFactory.getInstance().destroy();
251 IndexedDmaapTopicSinkFactory.getInstance().destroy();
255 public boolean isAlive() {
260 public boolean lock() {
262 synchronized (this) {
270 for (final TopicSource source : this.getTopicSources()) {
274 for (final TopicSink sink : this.getTopicSinks()) {
282 public boolean unlock() {
283 synchronized (this) {
291 for (final TopicSource source : this.getTopicSources()) {
295 for (final TopicSink sink : this.getTopicSinks()) {
303 public boolean isLocked() {
308 public List<TopicSource> getTopicSources(List<String> topicNames) {
310 if (topicNames == null) {
311 throw new IllegalArgumentException("must provide a list of topics");
314 final List<TopicSource> sources = new ArrayList<>();
315 for (final String topic : topicNames) {
317 final TopicSource uebSource = this.getUebTopicSource(topic);
318 if (uebSource != null) {
319 sources.add(uebSource);
321 } catch (final Exception e) {
322 logger.debug("No UEB source for topic: {}", topic, e);
326 final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
327 if (dmaapSource != null) {
328 sources.add(dmaapSource);
330 } catch (final Exception e) {
331 logger.debug("No DMAAP source for topic: {}", topic, e);
338 public List<TopicSink> getTopicSinks(List<String> topicNames) {
340 if (topicNames == null) {
341 throw new IllegalArgumentException("must provide a list of topics");
344 final List<TopicSink> sinks = new ArrayList<>();
345 for (final String topic : topicNames) {
347 final TopicSink uebSink = this.getUebTopicSink(topic);
348 if (uebSink != null) {
351 } catch (final Exception e) {
352 logger.debug("No UEB sink for topic: {}", topic, e);
356 final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
357 if (dmaapSink != null) {
358 sinks.add(dmaapSink);
360 } catch (final Exception e) {
361 logger.debug("No DMAAP sink for topic: {}", topic, e);
365 final TopicSink noopSink = this.getNoopTopicSink(topic);
366 if (noopSink != null) {
369 } catch (final Exception e) {
370 logger.debug("No NOOP sink for topic: {}", topic, e);
377 public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) {
379 if (commType == null) {
380 throw parmException(topicName);
383 if (topicName == null) {
384 throw parmException(topicName);
389 return this.getUebTopicSource(topicName);
391 return this.getDmaapTopicSource(topicName);
393 throw new UnsupportedOperationException("Unsupported " + commType.name());
397 private IllegalArgumentException parmException(String topicName) {
398 return new IllegalArgumentException(
399 "Invalid parameter: a communication infrastructure required to fetch " + topicName);
403 public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) {
404 if (commType == null) {
405 throw parmException(topicName);
408 if (topicName == null) {
409 throw parmException(topicName);
414 return this.getUebTopicSink(topicName);
416 return this.getDmaapTopicSink(topicName);
418 return this.getNoopTopicSink(topicName);
420 throw new UnsupportedOperationException("Unsupported " + commType.name());
425 public List<TopicSink> getTopicSinks(String topicName) {
426 if (topicName == null) {
427 throw parmException(topicName);
430 final List<TopicSink> sinks = new ArrayList<>();
433 sinks.add(this.getUebTopicSink(topicName));
434 } catch (final Exception e) {
435 logNoSink(topicName, e);
439 sinks.add(this.getDmaapTopicSink(topicName));
440 } catch (final Exception e) {
441 logNoSink(topicName, e);
445 sinks.add(this.getNoopTopicSink(topicName));
446 } catch (final Exception e) {
447 logNoSink(topicName, e);
453 private void logNoSink(String topicName, Exception ex) {
454 logger.debug("No sink for topic: {}", topicName, ex);
458 public UebTopicSource getUebTopicSource(String topicName) {
459 return IndexedUebTopicSourceFactory.getInstance().get(topicName);
463 public UebTopicSink getUebTopicSink(String topicName) {
464 return IndexedUebTopicSinkFactory.getInstance().get(topicName);
468 public DmaapTopicSource getDmaapTopicSource(String topicName) {
469 return IndexedDmaapTopicSourceFactory.getInstance().get(topicName);
473 public DmaapTopicSink getDmaapTopicSink(String topicName) {
474 return IndexedDmaapTopicSinkFactory.getInstance().get(topicName);
478 public NoopTopicSink getNoopTopicSink(String topicName) {
479 return IndexedNoopTopicSinkFactory.getInstance().get(topicName);