missing check for noop sinks 75/11975/1
authorJorge Hernandez <jh1730@att.com>
Tue, 12 Sep 2017 15:27:08 +0000 (10:27 -0500)
committerJorge Hernandez <jh1730@att.com>
Tue, 12 Sep 2017 15:27:08 +0000 (10:27 -0500)
Change-Id: If7167415c361fad2478809ac6c41981beaadacd6
Issue-ID: POLICY-119
Signed-off-by: Jorge Hernandez <jh1730@att.com>
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java

index cc3705e..09ee9a4 100644 (file)
@@ -7,9 +7,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,214 +29,217 @@ import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
 import org.onap.policy.drools.event.comm.bus.NoopTopicSink;
 import org.onap.policy.drools.event.comm.bus.UebTopicSink;
 import org.onap.policy.drools.event.comm.bus.UebTopicSource;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
 import org.onap.policy.drools.properties.Lockable;
 import org.onap.policy.drools.properties.Startable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
 /**
- * Abstraction to managed the system's Networked Topic Endpoints,
- * sources of all events input into the System. 
+ * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into
+ * the System.
  */
 public interface TopicEndpoint extends Startable, Lockable {
-       
-       /**
-        * Add Topic Sources to the communication infrastructure initialized per
-        * properties
-        * 
-        * @param properties properties for Topic Source construction
-        * @return a generic Topic Source
-        * @throws IllegalArgumentException when invalid arguments are provided
-        */
-       public List<TopicSource> addTopicSources(Properties properties);
-
-       /**
-        * Add Topic Sinks to the communication infrastructure initialized per
-        * properties
-        * 
-        * @param properties properties for Topic Sink construction
-        * @return a generic Topic Sink
-        * @throws IllegalArgumentException when invalid arguments are provided
-        */
-       public List<TopicSink> addTopicSinks(Properties properties);
-       
-       /**
-        * gets all Topic Sources
-        * @return the Topic Source List
-        */
-       List<TopicSource> getTopicSources();
-       
-       /**
-        * get the Topic Sources for the given topic name
-        * 
-        * @param topicName the topic name
-        * 
-        * @return the Topic Source List
-        * @throws IllegalStateException if the entity is in an invalid state
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public List<TopicSource> getTopicSources(List<String> topicNames);
-       
-       /**
-        * gets the Topic Source for the given topic name and 
-        * underlying communication infrastructure type
-        * 
-        * @param commType communication infrastructure type
-        * @param topicName the topic name
-        * 
-        * @return the Topic Source
-        * @throws IllegalStateException if the entity is in an invalid state, for
-        * example multiple TopicReaders for a topic name and communication infrastructure
-        * @throws IllegalArgumentException if invalid parameters are present
-        * @throws UnsupportedOperationException if the operation is not supported.
-        */
-       public TopicSource getTopicSource(Topic.CommInfrastructure commType, 
-                                                 String topicName) 
-                       throws UnsupportedOperationException;
-       
-       /**
-        * get the UEB Topic Source for the given topic name
-        * 
-        * @param topicName the topic name
-        * 
-        * @return the UEB Topic Source
-        * @throws IllegalStateException if the entity is in an invalid state, for
-        * example multiple TopicReaders for a topic name and communication infrastructure
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public UebTopicSource getUebTopicSource(String topicName);
-       
-       /**
-        * get the DMAAP Topic Source for the given topic name
-        * 
-        * @param topicName the topic name
-        * 
-        * @return the DMAAP Topic Source
-        * @throws IllegalStateException if the entity is in an invalid state, for
-        * example multiple TopicReaders for a topic name and communication infrastructure
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public DmaapTopicSource getDmaapTopicSource(String topicName);
-       
-       /**
-        * get the Topic Sinks for the given topic name
-        * 
-        * @param topicNames the topic names
-        * @return the Topic Sink List
-        * @throws IllegalStateException
-        * @throws IllegalArgumentException
-        */
-       public List<TopicSink> getTopicSinks(List<String> topicNames);
-       
-       /**
-        * get the Topic Sinks for the given topic name and 
-        * underlying communication infrastructure type
-        * 
-        * @param topicName the topic name
-        * @param commType communication infrastructure type
-        * 
-        * @return the Topic Sink List
-        * @throws IllegalStateException if the entity is in an invalid state, for
-        * example multiple TopicWriters for a topic name and communication infrastructure
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public TopicSink getTopicSink(Topic.CommInfrastructure commType, 
-                                             String topicName) 
-                       throws UnsupportedOperationException;
-       
-       /**
-        * get the Topic Sinks for the given topic name and 
-        * all the underlying communication infrastructure type
-        * 
-        * @param topicName the topic name
-        * @param commType communication infrastructure type
-        * 
-        * @return the Topic Sink List
-        * @throws IllegalStateException if the entity is in an invalid state, for
-        * example multiple TopicWriters for a topic name and communication infrastructure
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public List<TopicSink> getTopicSinks(String topicName);
-       
-       /**
-        * get the UEB Topic Source for the given topic name
-        * 
-        * @param topicName the topic name
-        * 
-        * @return the Topic Source
-        * @throws IllegalStateException if the entity is in an invalid state, for
-        * example multiple TopicReaders for a topic name and communication infrastructure
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public UebTopicSink getUebTopicSink(String topicName);
-       
-       /**
-        * get the no-op Topic Sink for the given topic name
-        * 
-        * @param topicName the topic name
-        * 
-        * @return the Topic Source
-        * @throws IllegalStateException if the entity is in an invalid state, for
-        * example multiple TopicReaders for a topic name and communication infrastructure
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public NoopTopicSink getNoopTopicSink(String topicName);
-       
-       /**
-        * get the DMAAP Topic Source for the given topic name
-        * 
-        * @param topicName the topic name
-        * 
-        * @return the Topic Source
-        * @throws IllegalStateException if the entity is in an invalid state, for
-        * example multiple TopicReaders for a topic name and communication infrastructure
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public DmaapTopicSink getDmaapTopicSink(String topicName);
-       
-       /**
-        * gets only the UEB Topic Sources
-        * @return the UEB Topic Source List
-        */
-       public List<UebTopicSource> getUebTopicSources();
-       
-       /**
-        * gets only the DMAAP Topic Sources
-        * @return the DMAAP Topic Source List
-        */
-       public List<DmaapTopicSource> getDmaapTopicSources();
-       
-       /**
-        * gets all Topic Sinks
-        * @return the Topic Sink List
-        */
-       public List<TopicSink> getTopicSinks();
-       
-       /**
-        * gets only the UEB Topic Sinks
-        * @return the UEB Topic Sink List
-        */
-       public List<UebTopicSink> getUebTopicSinks();
-       
-       /**
-        * gets only the DMAAP Topic Sinks
-        * @return the DMAAP Topic Sink List
-        */
-       public List<DmaapTopicSink> getDmaapTopicSinks();
-       
-       /**
-        * gets only the NOOP Topic Sinks
-        * @return the NOOP Topic Sinks List
-        */
-       public List<NoopTopicSink> getNoopTopicSinks();
-       
-       /**
-        * singleton for global access
-        */
-       public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
+
+  /**
+   * Add Topic Sources to the communication infrastructure initialized per properties
+   *
+   * @param properties properties for Topic Source construction
+   * @return a generic Topic Source
+   * @throws IllegalArgumentException when invalid arguments are provided
+   */
+  public List<TopicSource> addTopicSources(Properties properties);
+
+  /**
+   * Add Topic Sinks to the communication infrastructure initialized per properties
+   *
+   * @param properties properties for Topic Sink construction
+   * @return a generic Topic Sink
+   * @throws IllegalArgumentException when invalid arguments are provided
+   */
+  public List<TopicSink> addTopicSinks(Properties properties);
+
+  /**
+   * gets all Topic Sources
+   *
+   * @return the Topic Source List
+   */
+  List<TopicSource> getTopicSources();
+
+  /**
+   * get the Topic Sources for the given topic name
+   *
+   * @param topicName the topic name
+   *
+   * @return the Topic Source List
+   * @throws IllegalStateException if the entity is in an invalid state
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public List<TopicSource> getTopicSources(List<String> topicNames);
+
+  /**
+   * gets the Topic Source for the given topic name and underlying communication infrastructure type
+   *
+   * @param commType communication infrastructure type
+   * @param topicName the topic name
+   *
+   * @return the Topic Source
+   * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+   *         TopicReaders for a topic name and communication infrastructure
+   * @throws IllegalArgumentException if invalid parameters are present
+   * @throws UnsupportedOperationException if the operation is not supported.
+   */
+  public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
+      throws UnsupportedOperationException;
+
+  /**
+   * get the UEB Topic Source for the given topic name
+   *
+   * @param topicName the topic name
+   *
+   * @return the UEB Topic Source
+   * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+   *         TopicReaders for a topic name and communication infrastructure
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public UebTopicSource getUebTopicSource(String topicName);
+
+  /**
+   * get the DMAAP Topic Source for the given topic name
+   *
+   * @param topicName the topic name
+   *
+   * @return the DMAAP Topic Source
+   * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+   *         TopicReaders for a topic name and communication infrastructure
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public DmaapTopicSource getDmaapTopicSource(String topicName);
+
+  /**
+   * get the Topic Sinks for the given topic name
+   *
+   * @param topicNames the topic names
+   * @return the Topic Sink List
+   * @throws IllegalStateException
+   * @throws IllegalArgumentException
+   */
+  public List<TopicSink> getTopicSinks(List<String> topicNames);
+
+  /**
+   * get the Topic Sinks for the given topic name and underlying communication infrastructure type
+   *
+   * @param topicName the topic name
+   * @param commType communication infrastructure type
+   *
+   * @return the Topic Sink List
+   * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+   *         TopicWriters for a topic name and communication infrastructure
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
+      throws UnsupportedOperationException;
+
+  /**
+   * get the Topic Sinks for the given topic name and all the underlying communication
+   * infrastructure type
+   *
+   * @param topicName the topic name
+   * @param commType communication infrastructure type
+   *
+   * @return the Topic Sink List
+   * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+   *         TopicWriters for a topic name and communication infrastructure
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public List<TopicSink> getTopicSinks(String topicName);
+
+  /**
+   * get the UEB Topic Source for the given topic name
+   *
+   * @param topicName the topic name
+   *
+   * @return the Topic Source
+   * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+   *         TopicReaders for a topic name and communication infrastructure
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public UebTopicSink getUebTopicSink(String topicName);
+
+  /**
+   * get the no-op Topic Sink for the given topic name
+   *
+   * @param topicName the topic name
+   *
+   * @return the Topic Source
+   * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+   *         TopicReaders for a topic name and communication infrastructure
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public NoopTopicSink getNoopTopicSink(String topicName);
+
+  /**
+   * get the DMAAP Topic Source for the given topic name
+   *
+   * @param topicName the topic name
+   *
+   * @return the Topic Source
+   * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+   *         TopicReaders for a topic name and communication infrastructure
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public DmaapTopicSink getDmaapTopicSink(String topicName);
+
+  /**
+   * gets only the UEB Topic Sources
+   *
+   * @return the UEB Topic Source List
+   */
+  public List<UebTopicSource> getUebTopicSources();
+
+  /**
+   * gets only the DMAAP Topic Sources
+   *
+   * @return the DMAAP Topic Source List
+   */
+  public List<DmaapTopicSource> getDmaapTopicSources();
+
+  /**
+   * gets all Topic Sinks
+   *
+   * @return the Topic Sink List
+   */
+  public List<TopicSink> getTopicSinks();
+
+  /**
+   * gets only the UEB Topic Sinks
+   *
+   * @return the UEB Topic Sink List
+   */
+  public List<UebTopicSink> getUebTopicSinks();
+
+  /**
+   * gets only the DMAAP Topic Sinks
+   *
+   * @return the DMAAP Topic Sink List
+   */
+  public List<DmaapTopicSink> getDmaapTopicSinks();
+
+  /**
+   * gets only the NOOP Topic Sinks
+   *
+   * @return the NOOP Topic Sinks List
+   */
+  public List<NoopTopicSink> getNoopTopicSinks();
+
+  /**
+   * singleton for global access
+   */
+  public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
 }
 
+
 /*
  * ----------------- implementation -------------------
  */
@@ -246,398 +249,412 @@ public interface TopicEndpoint extends Startable, Lockable {
  * implementations according to the communication infrastructure that are supported
  */
 class ProxyTopicEndpointManager implements TopicEndpoint {
-       /**
-        * Logger
-        */
-       private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
-       /**
-        * Is this element locked?
-        */
-       protected volatile boolean locked = false;
-       
-       /**
-        * Is this element alive?
-        */
-       protected volatile boolean alive = false;
-       
-       @Override
-       public List<TopicSource> addTopicSources(Properties properties) {
-               
-               // 1. Create UEB Sources
-               // 2. Create DMAAP Sources
-               
-               List<TopicSource> sources = new ArrayList<>();  
-               
-               sources.addAll(UebTopicSource.factory.build(properties));
-               sources.addAll(DmaapTopicSource.factory.build(properties));
-               
-               if (this.isLocked()) {
-                       for (TopicSource source : sources) {
-                               source.lock();
-                       }
-               }
-               
-               return sources;
-       }
-       
-       @Override
-       public List<TopicSink> addTopicSinks(Properties properties) {
-               // 1. Create UEB Sinks
-               // 2. Create DMAAP Sinks
-               
-               List<TopicSink> sinks = new ArrayList<>();      
-               
-               sinks.addAll(UebTopicSink.factory.build(properties));
-               sinks.addAll(DmaapTopicSink.factory.build(properties));
-               sinks.addAll(NoopTopicSink.factory.build(properties));
-               
-               if (this.isLocked()) {
-                       for (TopicSink sink : sinks) {
-                               sink.lock();
-                       }
-               }
-               
-               return sinks;
-       }
-
-       @Override
-       public List<TopicSource> getTopicSources() {
-       
-               List<TopicSource> sources = new ArrayList<>();
-               
-               sources.addAll(UebTopicSource.factory.inventory());
-               sources.addAll(DmaapTopicSource.factory.inventory());
-               
-               return sources;
-       }
-
-       @Override
-       public List<TopicSink> getTopicSinks() {
-               
-               List<TopicSink> sinks = new ArrayList<>();      
-               
-               sinks.addAll(UebTopicSink.factory.inventory());
-               sinks.addAll(DmaapTopicSink.factory.inventory());
-               sinks.addAll(NoopTopicSink.factory.inventory());
-               
-               return sinks;
-       }
-
-       @JsonIgnore
-       @Override
-       public List<UebTopicSource> getUebTopicSources() {
-               return UebTopicSource.factory.inventory();
-       }
-       
-       @JsonIgnore
-       @Override
-       public List<DmaapTopicSource> getDmaapTopicSources() {
-               return DmaapTopicSource.factory.inventory();
-       }
-
-       @JsonIgnore
-       @Override
-       public List<UebTopicSink> getUebTopicSinks() {
-               return UebTopicSink.factory.inventory();
-       }
-       
-       @JsonIgnore
-       @Override
-       public List<DmaapTopicSink> getDmaapTopicSinks() {
-               return DmaapTopicSink.factory.inventory();
-       }
-       
-       @JsonIgnore
-       @Override
-       public List<NoopTopicSink> getNoopTopicSinks() {
-               return NoopTopicSink.factory.inventory();
-       }
-
-       @Override
-       public boolean start() {
-               
-               synchronized (this) {
-                       if (this.locked) {
-                               throw new IllegalStateException(this + " is locked");
-                       }
-                       
-                       if (this.alive) {
-                               return true;
-                       }
-                       
-                       this.alive = true;
-               }
-               
-               List<Startable> endpoints = getEndpoints();
-               
-               boolean success = true;
-               for (Startable endpoint: endpoints) {
-                       try {
-                               success = endpoint.start() && success;
-                       } catch (Exception e) {
-                               success = false;
-                               logger.error("Problem starting endpoint: {}", endpoint, e);
-                       }
-               }
-               
-               return success;
-       }
-
-
-       @Override
-       public boolean stop() {
-               
-               /* 
-                * stop regardless if it is locked, in other
-                * words, stop operation has precedence over
-                * locks.
-                */
-               synchronized (this) {                   
-                       this.alive = false;
-               }
-               
-               List<Startable> endpoints = getEndpoints();
-               
-               boolean success = true;
-               for (Startable endpoint: endpoints) {
-                       try {
-                               success = endpoint.stop() && success;
-                       } catch (Exception e) {
-                               success = false;
-                               logger.error("Problem stopping endpoint: {}", endpoint, e);
-                       }
-               }
-               
-               return success;
-       }
-       
-       /**
-        * 
-        * @return list of managed endpoints
-        */
-       @JsonIgnore
-       protected List<Startable> getEndpoints() {
-               List<Startable> endpoints = new ArrayList<>();
-
-               endpoints.addAll(this.getTopicSources());
-               endpoints.addAll(this.getTopicSinks());
-               
-               return endpoints;
-       }
-       
-       @Override
-       public void shutdown() {
-               UebTopicSource.factory.destroy();
-               UebTopicSink.factory.destroy();
-               
-               DmaapTopicSource.factory.destroy();
-               DmaapTopicSink.factory.destroy();
-       }
-
-       @Override
-       public boolean isAlive() {
-               return this.alive;
-       }
-
-       @Override
-       public boolean lock() {
-               
-               synchronized (this) {
-                       if (locked)
-                               return true;
-                       
-                       this.locked = true;
-               }
-               
-               for (TopicSource source: this.getTopicSources()) {
-                       source.lock();
-               }
-               
-               for (TopicSink sink: this.getTopicSinks()) {
-                       sink.lock();
-               }
-               
-               return true;
-       }
-
-       @Override
-       public boolean unlock() {
-               synchronized (this) {
-                       if (!locked)
-                               return true;
-                       
-                       this.locked = false;
-               }
-               
-               for (TopicSource source: this.getTopicSources()) {
-                       source.unlock();
-               }
-               
-               for (TopicSink sink: this.getTopicSinks()) {
-                       sink.unlock();
-               }
-               
-               return true;
-       }
-
-       @Override
-       public boolean isLocked() {
-               return this.locked;
-       }
-
-       @Override
-       public List<TopicSource> getTopicSources(List<String> topicNames) {
-               
-               if (topicNames == null) {
-                       throw new IllegalArgumentException("must provide a list of topics");
-               }
-               
-               List<TopicSource> sources = new ArrayList<>();
-               for (String topic: topicNames) {
-                       try {
-                               TopicSource uebSource = this.getUebTopicSource(topic);
-                               if (uebSource != null)
-                                       sources.add(uebSource);
-                       } catch (Exception e) {
-                               logger.info("No UEB source for topic: {}", topic, e);
-                       }
-                       
-                       try {
-                               TopicSource dmaapSource = this.getDmaapTopicSource(topic);
-                               if (dmaapSource != null)
-                                       sources.add(dmaapSource);
-                       } catch (Exception e) {
-                               logger.info("No DMAAP source for topic: {}", topic, e);
-                       }
-               }
-               return sources;
-       }
-
-       @Override
-       public List<TopicSink> getTopicSinks(List<String> topicNames) {
-               
-               if (topicNames == null) {
-                       throw new IllegalArgumentException("must provide a list of topics");
-               }
-               
-               List<TopicSink> sinks = new ArrayList<>();
-               for (String topic: topicNames) {
-                       try {
-                               TopicSink uebSink = this.getUebTopicSink(topic);
-                               if (uebSink != null)
-                                       sinks.add(uebSink);
-                       } catch (Exception e) {
-                               logger.info("No UEB sink for topic: {}", topic, e);
-                       }
-                       
-                       try {
-                               TopicSink dmaapSink = this.getDmaapTopicSink(topic);
-                               if (dmaapSink != null)
-                                       sinks.add(dmaapSink);
-                       } catch (Exception e) {
-                               logger.info("No DMAAP sink for topic: {}", topic, e);
-                       }
-               }
-               return sinks;
-       }
-
-       @Override
-       public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
-                       throws UnsupportedOperationException {
-               
-               if (commType == null) {
-                       throw new IllegalArgumentException
-                               ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
-               }
-               
-               if (topicName == null) {
-                       throw new IllegalArgumentException
-                               ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
-               }
-               
-               switch (commType) {
-               case UEB:
-                       return this.getUebTopicSource(topicName);
-               case DMAAP:
-                       return this.getDmaapTopicSource(topicName);
-               case REST:
-               default:
-                       throw new UnsupportedOperationException("Unsupported " + commType.name());
-               }
-       }
-
-       @Override
-       public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
-                       throws UnsupportedOperationException {
-               if (commType == null) {
-                       throw new IllegalArgumentException
-                               ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
-               }
-               
-               if (topicName == null) {
-                       throw new IllegalArgumentException
-                               ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
-               }
-               
-               switch (commType) {
-               case UEB:
-                       return this.getUebTopicSink(topicName);
-               case DMAAP:
-                       return this.getDmaapTopicSink(topicName);
-               case REST:
-               default:
-                       throw new UnsupportedOperationException("Unsupported " + commType.name());
-               }
-       }
-       
-       @Override
-       public List<TopicSink> getTopicSinks(String topicName) {
-
-               if (topicName == null) {
-                       throw new IllegalArgumentException
-                               ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
-               }
-               
-               List<TopicSink> sinks = new ArrayList<>();
-               
-               try {
-                       sinks.add(this.getUebTopicSink(topicName));
-               } catch (Exception e) {
-                       logger.debug("No sink for topic: {}", topicName, e);
-               }
-               
-               try {
-                       sinks.add(this.getDmaapTopicSink(topicName));
-               } catch (Exception e) {
-                       logger.debug("No sink for topic: {}", topicName, e);
-               }
-               
-               return sinks;
-       }
-
-       @Override
-       public UebTopicSource getUebTopicSource(String topicName) {
-               return UebTopicSource.factory.get(topicName);
-       }
-
-       @Override
-       public UebTopicSink getUebTopicSink(String topicName) {
-               return UebTopicSink.factory.get(topicName);
-       }
-
-       @Override
-       public DmaapTopicSource getDmaapTopicSource(String topicName) {
-               return DmaapTopicSource.factory.get(topicName);
-       }
-
-       @Override
-       public DmaapTopicSink getDmaapTopicSink(String topicName) {
-               return DmaapTopicSink.factory.get(topicName);
-       }
-
-       @Override
-       public NoopTopicSink getNoopTopicSink(String topicName) {
-               return NoopTopicSink.factory.get(topicName);
-       }
-       
+  /**
+   * Logger
+   */
+  private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
+  /**
+   * Is this element locked?
+   */
+  protected volatile boolean locked = false;
+
+  /**
+   * Is this element alive?
+   */
+  protected volatile boolean alive = false;
+
+  @Override
+  public List<TopicSource> addTopicSources(Properties properties) {
+
+    // 1. Create UEB Sources
+    // 2. Create DMAAP Sources
+
+    final List<TopicSource> sources = new ArrayList<>();
+
+    sources.addAll(UebTopicSource.factory.build(properties));
+    sources.addAll(DmaapTopicSource.factory.build(properties));
+
+    if (this.isLocked()) {
+      for (final TopicSource source : sources) {
+        source.lock();
+      }
+    }
+
+    return sources;
+  }
+
+  @Override
+  public List<TopicSink> addTopicSinks(Properties properties) {
+    // 1. Create UEB Sinks
+    // 2. Create DMAAP Sinks
+
+    final List<TopicSink> sinks = new ArrayList<>();
+
+    sinks.addAll(UebTopicSink.factory.build(properties));
+    sinks.addAll(DmaapTopicSink.factory.build(properties));
+    sinks.addAll(NoopTopicSink.factory.build(properties));
+
+    if (this.isLocked()) {
+      for (final TopicSink sink : sinks) {
+        sink.lock();
+      }
+    }
+
+    return sinks;
+  }
+
+  @Override
+  public List<TopicSource> getTopicSources() {
+
+    final List<TopicSource> sources = new ArrayList<>();
+
+    sources.addAll(UebTopicSource.factory.inventory());
+    sources.addAll(DmaapTopicSource.factory.inventory());
+
+    return sources;
+  }
+
+  @Override
+  public List<TopicSink> getTopicSinks() {
+
+    final List<TopicSink> sinks = new ArrayList<>();
+
+    sinks.addAll(UebTopicSink.factory.inventory());
+    sinks.addAll(DmaapTopicSink.factory.inventory());
+    sinks.addAll(NoopTopicSink.factory.inventory());
+
+    return sinks;
+  }
+
+  @JsonIgnore
+  @Override
+  public List<UebTopicSource> getUebTopicSources() {
+    return UebTopicSource.factory.inventory();
+  }
+
+  @JsonIgnore
+  @Override
+  public List<DmaapTopicSource> getDmaapTopicSources() {
+    return DmaapTopicSource.factory.inventory();
+  }
+
+  @JsonIgnore
+  @Override
+  public List<UebTopicSink> getUebTopicSinks() {
+    return UebTopicSink.factory.inventory();
+  }
+
+  @JsonIgnore
+  @Override
+  public List<DmaapTopicSink> getDmaapTopicSinks() {
+    return DmaapTopicSink.factory.inventory();
+  }
+
+  @JsonIgnore
+  @Override
+  public List<NoopTopicSink> getNoopTopicSinks() {
+    return NoopTopicSink.factory.inventory();
+  }
+
+  @Override
+  public boolean start() {
+
+    synchronized (this) {
+      if (this.locked) {
+        throw new IllegalStateException(this + " is locked");
+      }
+
+      if (this.alive) {
+        return true;
+      }
+
+      this.alive = true;
+    }
+
+    final List<Startable> endpoints = this.getEndpoints();
+
+    boolean success = true;
+    for (final Startable endpoint : endpoints) {
+      try {
+        success = endpoint.start() && success;
+      } catch (final Exception e) {
+        success = false;
+        logger.error("Problem starting endpoint: {}", endpoint, e);
+      }
+    }
+
+    return success;
+  }
+
+
+  @Override
+  public boolean stop() {
+
+    /*
+     * stop regardless if it is locked, in other words, stop operation has precedence over locks.
+     */
+    synchronized (this) {
+      this.alive = false;
+    }
+
+    final List<Startable> endpoints = this.getEndpoints();
+
+    boolean success = true;
+    for (final Startable endpoint : endpoints) {
+      try {
+        success = endpoint.stop() && success;
+      } catch (final Exception e) {
+        success = false;
+        logger.error("Problem stopping endpoint: {}", endpoint, e);
+      }
+    }
+
+    return success;
+  }
+
+  /**
+   *
+   * @return list of managed endpoints
+   */
+  @JsonIgnore
+  protected List<Startable> getEndpoints() {
+    final List<Startable> endpoints = new ArrayList<>();
+
+    endpoints.addAll(this.getTopicSources());
+    endpoints.addAll(this.getTopicSinks());
+
+    return endpoints;
+  }
+
+  @Override
+  public void shutdown() {
+    UebTopicSource.factory.destroy();
+    UebTopicSink.factory.destroy();
+    NoopTopicSink.factory.destroy();
+
+    DmaapTopicSource.factory.destroy();
+    DmaapTopicSink.factory.destroy();
+  }
+
+  @Override
+  public boolean isAlive() {
+    return this.alive;
+  }
+
+  @Override
+  public boolean lock() {
+
+    synchronized (this) {
+      if (this.locked)
+        return true;
+
+      this.locked = true;
+    }
+
+    for (final TopicSource source : this.getTopicSources()) {
+      source.lock();
+    }
+
+    for (final TopicSink sink : this.getTopicSinks()) {
+      sink.lock();
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean unlock() {
+    synchronized (this) {
+      if (!this.locked)
+        return true;
+
+      this.locked = false;
+    }
+
+    for (final TopicSource source : this.getTopicSources()) {
+      source.unlock();
+    }
+
+    for (final TopicSink sink : this.getTopicSinks()) {
+      sink.unlock();
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean isLocked() {
+    return this.locked;
+  }
+
+  @Override
+  public List<TopicSource> getTopicSources(List<String> topicNames) {
+
+    if (topicNames == null) {
+      throw new IllegalArgumentException("must provide a list of topics");
+    }
+
+    final List<TopicSource> sources = new ArrayList<>();
+    for (final String topic : topicNames) {
+      try {
+        final TopicSource uebSource = this.getUebTopicSource(topic);
+        if (uebSource != null)
+          sources.add(uebSource);
+      } catch (final Exception e) {
+        logger.debug("No UEB source for topic: {}", topic, e);
+      }
+
+      try {
+        final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
+        if (dmaapSource != null)
+          sources.add(dmaapSource);
+      } catch (final Exception e) {
+        logger.debug("No DMAAP source for topic: {}", topic, e);
+      }
+    }
+    return sources;
+  }
+
+  @Override
+  public List<TopicSink> getTopicSinks(List<String> topicNames) {
+
+    if (topicNames == null) {
+      throw new IllegalArgumentException("must provide a list of topics");
+    }
+
+    final List<TopicSink> sinks = new ArrayList<>();
+    for (final String topic : topicNames) {
+      try {
+        final TopicSink uebSink = this.getUebTopicSink(topic);
+        if (uebSink != null)
+          sinks.add(uebSink);
+      } catch (final Exception e) {
+        logger.debug("No UEB sink for topic: {}", topic, e);
+      }
+
+      try {
+        final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
+        if (dmaapSink != null)
+          sinks.add(dmaapSink);
+      } catch (final Exception e) {
+        logger.debug("No DMAAP sink for topic: {}", topic, e);
+      }
+
+      try {
+        final TopicSink noopSink = this.getNoopTopicSink(topic);
+        if (noopSink != null)
+          sinks.add(noopSink);
+      } catch (final Exception e) {
+        logger.debug("No NOOP sink for topic: {}", topic, e);
+      }
+    }
+    return sinks;
+  }
+
+  @Override
+  public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
+      throws UnsupportedOperationException {
+
+    if (commType == null) {
+      throw new IllegalArgumentException(
+          "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+    }
+
+    if (topicName == null) {
+      throw new IllegalArgumentException(
+          "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+    }
+
+    switch (commType) {
+      case UEB:
+        return this.getUebTopicSource(topicName);
+      case DMAAP:
+        return this.getDmaapTopicSource(topicName);
+      case REST:
+      default:
+        throw new UnsupportedOperationException("Unsupported " + commType.name());
+    }
+  }
+
+  @Override
+  public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
+      throws UnsupportedOperationException {
+    if (commType == null) {
+      throw new IllegalArgumentException(
+          "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+    }
+
+    if (topicName == null) {
+      throw new IllegalArgumentException(
+          "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+    }
+
+    switch (commType) {
+      case UEB:
+        return this.getUebTopicSink(topicName);
+      case DMAAP:
+        return this.getDmaapTopicSink(topicName);
+      case NOOP:
+        return this.getNoopTopicSink(topicName);
+      case REST:
+      default:
+        throw new UnsupportedOperationException("Unsupported " + commType.name());
+    }
+  }
+
+  @Override
+  public List<TopicSink> getTopicSinks(String topicName) {
+    if (topicName == null) {
+      throw new IllegalArgumentException(
+          "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+    }
+
+    final List<TopicSink> sinks = new ArrayList<>();
+
+    try {
+      sinks.add(this.getUebTopicSink(topicName));
+    } catch (final Exception e) {
+      logger.debug("No sink for topic: {}", topicName, e);
+    }
+
+    try {
+      sinks.add(this.getDmaapTopicSink(topicName));
+    } catch (final Exception e) {
+      logger.debug("No sink for topic: {}", topicName, e);
+    }
+
+    try {
+      sinks.add(this.getNoopTopicSink(topicName));
+    } catch (final Exception e) {
+      logger.debug("No sink for topic: {}", topicName, e);
+    }
+
+    return sinks;
+  }
+
+  @Override
+  public UebTopicSource getUebTopicSource(String topicName) {
+    return UebTopicSource.factory.get(topicName);
+  }
+
+  @Override
+  public UebTopicSink getUebTopicSink(String topicName) {
+    return UebTopicSink.factory.get(topicName);
+  }
+
+  @Override
+  public DmaapTopicSource getDmaapTopicSource(String topicName) {
+    return DmaapTopicSource.factory.get(topicName);
+  }
+
+  @Override
+  public DmaapTopicSink getDmaapTopicSink(String topicName) {
+    return DmaapTopicSink.factory.get(topicName);
+  }
+
+  @Override
+  public NoopTopicSink getNoopTopicSink(String topicName) {
+    return NoopTopicSink.factory.get(topicName);
+  }
+
 }
index 946e48c..5331539 100644 (file)
@@ -7,9 +7,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,191 +33,194 @@ import org.slf4j.LoggerFactory;
  * Noop Topic Sink Factory
  */
 public interface NoopTopicSinkFactory {
-       
-       /**
-        * Creates noop topic sinks based on properties files
-        * 
-        * @param properties Properties containing initialization values
-        * 
-        * @return a noop topic sink
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public List<NoopTopicSink> build(Properties properties);
-
-       /**
-        * builds a noop sink
-        *  
-        * @param servers list of servers
-        * @param topic topic name
-        * @param managed is this sink endpoint managed?
-        * @return a noop topic sink
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public NoopTopicSink build(List<String> servers, String topic, boolean managed);
-       
-       /**
-        * Destroys a sink based on the topic
-        * 
-        * @param topic topic name
-        * @throws IllegalArgumentException if invalid parameters are present
-        */
-       public void destroy(String topic);
-
-       /**
-        * gets a sink based on topic name
-        * @param topic the topic name
-        * 
-        * @return a sink with topic name
-        * @throws IllegalArgumentException if an invalid topic is provided
-        * @throws IllegalStateException if the sink is in an incorrect state
-        */
-       public NoopTopicSink get(String topic);
-       
-       /**
-        * Provides a snapshot of the UEB Topic Writers
-        * @return a list of the UEB Topic Writers
-        */
-       public List<NoopTopicSink> inventory();
-
-       /**
-        * Destroys all sinks
-        */
-       public void destroy();
+
+  /**
+   * Creates noop topic sinks based on properties files
+   * 
+   * @param properties Properties containing initialization values
+   * 
+   * @return a noop topic sink
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public List<NoopTopicSink> build(Properties properties);
+
+  /**
+   * builds a noop sink
+   * 
+   * @param servers list of servers
+   * @param topic topic name
+   * @param managed is this sink endpoint managed?
+   * @return a noop topic sink
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public NoopTopicSink build(List<String> servers, String topic, boolean managed);
+
+  /**
+   * Destroys a sink based on the topic
+   * 
+   * @param topic topic name
+   * @throws IllegalArgumentException if invalid parameters are present
+   */
+  public void destroy(String topic);
+
+  /**
+   * gets a sink based on topic name
+   * 
+   * @param topic the topic name
+   * 
+   * @return a sink with topic name
+   * @throws IllegalArgumentException if an invalid topic is provided
+   * @throws IllegalStateException if the sink is in an incorrect state
+   */
+  public NoopTopicSink get(String topic);
+
+  /**
+   * Provides a snapshot of the UEB Topic Writers
+   * 
+   * @return a list of the UEB Topic Writers
+   */
+  public List<NoopTopicSink> inventory();
+
+  /**
+   * Destroys all sinks
+   */
+  public void destroy();
 }
 
+
 /* ------------- implementation ----------------- */
 
 /**
  * Factory of noop sinks
  */
 class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
-       /**
-        * Logger 
-        */
-       private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);       
-       
-       /**
-        * noop topic sinks map
-        */
-       protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
-
-       @Override
-       public List<NoopTopicSink> build(Properties properties) {
-               
-               String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
-               if (sinkTopics == null || sinkTopics.isEmpty()) {
-                       logger.info("{}: no topic for noop sink", this);
-                       return new ArrayList<>();
-               }
-               
-               List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
-               List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>();
-               synchronized(this) {
-                       for (String topic: sinkTopicList) {
-                               if (this.noopTopicSinks.containsKey(topic)) {
-                                       newSinks.add(this.noopTopicSinks.get(topic));
-                                       continue;
-                               }
-                               
-                               String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + 
-                                                                       topic + 
-                                                                       PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-                               
-                               if (servers == null || servers.isEmpty()) 
-                                       servers = "noop";
-                               
-                               List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-                               
-                               String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
-                                                                                     PolicyProperties.PROPERTY_MANAGED_SUFFIX);
-                               boolean managed = true;
-                               if (managedString != null && !managedString.isEmpty()) {
-                                       managed = Boolean.parseBoolean(managedString);
-                               }                       
-                               
-                               NoopTopicSink noopSink = this.build(serverList, topic, managed);
-                               newSinks.add(noopSink);
-                       }
-                       return newSinks;
-               }
-       }
-
-       @Override
-       public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
-               if (servers == null) {
-                       servers = new ArrayList<>();
-               }
-               
-               if (servers.isEmpty()) {
-                       servers.add("noop");
-               }
-
-               if (topic == null || topic.isEmpty()) {
-                       throw new IllegalArgumentException("A topic must be provided");
-               }
-               
-               synchronized (this) {
-                       if (noopTopicSinks.containsKey(topic)) {
-                               return noopTopicSinks.get(topic);
-                       }
-                       
-                       NoopTopicSink sink = 
-                                       new NoopTopicSink(servers, topic);
-                       
-                       if (managed)
-                               noopTopicSinks.put(topic, sink);
-                       
-                       return sink;
-               }
-       }
-
-       @Override
-       public void destroy(String topic) {             
-               if (topic == null || topic.isEmpty()) {
-                       throw new IllegalArgumentException("A topic must be provided");
-               }
-               
-               NoopTopicSink noopSink;
-               synchronized(this) {
-                       if (!noopTopicSinks.containsKey(topic)) {
-                               return;
-                       }
-                       
-                       noopSink = noopTopicSinks.remove(topic);
-               }
-               
-               noopSink.shutdown();
-       }
-       
-       @Override
-       public void destroy() {
-               List<NoopTopicSink> sinks = this.inventory();
-               for (NoopTopicSink sink: sinks) {
-                       sink.shutdown();
-               }
-               
-               synchronized(this) {
-                       this.noopTopicSinks.clear();
-               }
-       }
-
-       @Override
-       public NoopTopicSink get(String topic) {
-               if (topic == null || topic.isEmpty()) {
-                       throw new IllegalArgumentException("A topic must be provided");
-               }
-               
-               synchronized(this) {
-                       if (noopTopicSinks.containsKey(topic)) {
-                               return noopTopicSinks.get(topic);
-                       } else {
-                               throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
-                       }
-               }
-       }
-
-       @Override
-       public List<NoopTopicSink> inventory() {
-                return new ArrayList<>(this.noopTopicSinks.values());
-       }
+  /**
+   * Logger
+   */
+  private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
+
+  /**
+   * noop topic sinks map
+   */
+  protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
+
+  @Override
+  public List<NoopTopicSink> build(Properties properties) {
+
+    final String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
+    if (sinkTopics == null || sinkTopics.isEmpty()) {
+      logger.info("{}: no topic for noop sink", this);
+      return new ArrayList<>();
+    }
+
+    final List<String> sinkTopicList =
+        new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
+    final List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>();
+    synchronized (this) {
+      for (final String topic : sinkTopicList) {
+        if (this.noopTopicSinks.containsKey(topic)) {
+          newSinks.add(this.noopTopicSinks.get(topic));
+          continue;
+        }
+
+        String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "."
+            + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+
+        if (servers == null || servers.isEmpty())
+          servers = "noop";
+
+        final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+
+        final String managedString =
+            properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic
+                + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+        boolean managed = true;
+        if (managedString != null && !managedString.isEmpty()) {
+          managed = Boolean.parseBoolean(managedString);
+        }
+
+        final NoopTopicSink noopSink = this.build(serverList, topic, managed);
+        newSinks.add(noopSink);
+      }
+      return newSinks;
+    }
+  }
+
+  @Override
+  public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
+    if (servers == null) {
+      servers = new ArrayList<>();
+    }
+
+    if (servers.isEmpty()) {
+      servers.add("noop");
+    }
+
+    if (topic == null || topic.isEmpty()) {
+      throw new IllegalArgumentException("A topic must be provided");
+    }
+
+    synchronized (this) {
+      if (this.noopTopicSinks.containsKey(topic)) {
+        return this.noopTopicSinks.get(topic);
+      }
+
+      final NoopTopicSink sink = new NoopTopicSink(servers, topic);
+
+      if (managed)
+        this.noopTopicSinks.put(topic, sink);
+
+      return sink;
+    }
+  }
+
+  @Override
+  public void destroy(String topic) {
+    if (topic == null || topic.isEmpty()) {
+      throw new IllegalArgumentException("A topic must be provided");
+    }
+
+    NoopTopicSink noopSink;
+    synchronized (this) {
+      if (!this.noopTopicSinks.containsKey(topic)) {
+        return;
+      }
+
+      noopSink = this.noopTopicSinks.remove(topic);
+    }
+
+    noopSink.shutdown();
+  }
+
+  @Override
+  public void destroy() {
+    final List<NoopTopicSink> sinks = this.inventory();
+    for (final NoopTopicSink sink : sinks) {
+      sink.shutdown();
+    }
+
+    synchronized (this) {
+      this.noopTopicSinks.clear();
+    }
+  }
+
+  @Override
+  public NoopTopicSink get(String topic) {
+    if (topic == null || topic.isEmpty()) {
+      throw new IllegalArgumentException("A topic must be provided");
+    }
+
+    synchronized (this) {
+      if (this.noopTopicSinks.containsKey(topic)) {
+        return this.noopTopicSinks.get(topic);
+      } else {
+        throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
+      }
+    }
+  }
+
+  @Override
+  public List<NoopTopicSink> inventory() {
+    return new ArrayList<>(this.noopTopicSinks.values());
+  }
 }