[POLICY-119] noop endpoint and refactoring 71/6271/2
authorJorge Hernandez <jh1730@att.com>
Wed, 26 Jul 2017 15:09:16 +0000 (10:09 -0500)
committerJorge Hernandez <jh1730@att.com>
Thu, 27 Jul 2017 04:21:18 +0000 (23:21 -0500)
noop endpoint was added to support communication between drools-applications
and policy-framework using the common "PolicyEngine.deliver(topic, message)"
mechanins.   The endpoint type will be noop.   This can be used for testing
sanity of any given drools-application from the framework.

An intial refactoring of the endpoints hierarchy was added too.

Change-Id: I62dbe75f511dd6215406fbd7cf0dd5a88bc1efc3
Signed-off-by: Jorge Hernandez <jh1730@att.com>
21 files changed:
policy-core/src/main/java/org/openecomp/policy/drools/properties/PolicyProperties.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/ApiKeyEnabled.java [moved from policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java with 90% similarity]
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSink.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSinkFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/TopicBase.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/HttpServerTest.java
policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/NoopTopicTest.java [new file with mode: 0644]
policy-management/src/main/java/org/openecomp/policy/drools/controller/DroolsControllerFactory.java

index 5e7a351..06082fc 100644 (file)
@@ -73,6 +73,8 @@ public interface PolicyProperties {
        public static final String PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX = ".dme2.subContextPath";
        public static final String PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX = ".dme2.sessionStickinessRequired";
        
+       public static final String PROPERTY_NOOP_SINK_TOPICS = "noop.sink.topics";
+       
        /* HTTP Server Properties */
        
        public static final String PROPERTY_HTTP_SERVER_SERVICES = "http.server.services";
index 54f49fd..62640bd 100644 (file)
@@ -22,11 +22,17 @@ package org.openecomp.policy.drools.event.comm;
 
 import java.util.List;
 
+import org.openecomp.policy.drools.properties.Lockable;
+import org.openecomp.policy.drools.properties.Startable;
+
 /**
  * Essential Topic Data
  */
-public interface Topic {
+public interface Topic extends TopicRegisterable, Startable, Lockable {
        
+       /**
+        * network logger
+        */
        public static final String NETWORK_LOGGER = "network";
        
        /**
@@ -41,6 +47,10 @@ public interface Topic {
                 * DMAAP Communication Infrastructure
                 */             
                DMAAP,
+               /**
+                * NOOP for internal use only
+                */
+               NOOP,
                /**
                 * REST Communication Infrastructure
                 */                             
index fa73ecb..337d78e 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
+import org.openecomp.policy.drools.event.comm.bus.NoopTopicSink;
 import org.openecomp.policy.drools.event.comm.bus.UebTopicSink;
 import org.openecomp.policy.drools.event.comm.bus.UebTopicSource;
 import org.slf4j.LoggerFactory;
@@ -180,6 +181,19 @@ public interface TopicEndpoint extends Startable, Lockable {
        public UebTopicSink getUebTopicSink(String topicName)
                        throws IllegalStateException, IllegalArgumentException;
        
+       /**
+        * get the no-op Topic Sink for the given topic name
+        * 
+        * @param topicName the topic name
+        * 
+        * @return the Topic Source
+        * @throws IllegalStateException if the entity is in an invalid state, for
+        * example multiple TopicReaders for a topic name and communication infrastructure
+        * @throws IllegalArgumentException if invalid parameters are present
+        */
+       public NoopTopicSink getNoopTopicSink(String topicName)
+                       throws IllegalStateException, IllegalArgumentException;
+       
        /**
         * get the DMAAP Topic Source for the given topic name
         * 
@@ -223,6 +237,12 @@ public interface TopicEndpoint extends Startable, Lockable {
         */
        public List<DmaapTopicSink> getDmaapTopicSinks();
        
+       /**
+        * gets only the NOOP Topic Sinks
+        * @return the NOOP Topic Sinks List
+        */
+       public List<NoopTopicSink> getNoopTopicSinks();
+       
        /**
         * singleton for global access
         */
@@ -287,6 +307,7 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
                
                sinks.addAll(UebTopicSink.factory.build(properties));
                sinks.addAll(DmaapTopicSink.factory.build(properties));
+               sinks.addAll(NoopTopicSink.factory.build(properties));
                
                if (this.isLocked()) {
                        for (TopicSink sink : sinks) {
@@ -321,6 +342,7 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
                
                sinks.addAll(UebTopicSink.factory.inventory());
                sinks.addAll(DmaapTopicSink.factory.inventory());
+               sinks.addAll(NoopTopicSink.factory.inventory());
                
                return sinks;
        }
@@ -360,6 +382,15 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
        public List<DmaapTopicSink> getDmaapTopicSinks() {
                return DmaapTopicSink.factory.inventory();
        }
+       
+       /**
+        * {@inheritDoc}
+        */
+       @JsonIgnore
+       @Override
+       public List<NoopTopicSink> getNoopTopicSinks() {
+               return NoopTopicSink.factory.inventory();
+       }
 
        /**
         * {@inheritDoc}
@@ -689,5 +720,10 @@ class ProxyTopicEndpointManager implements TopicEndpoint {
        public DmaapTopicSink getDmaapTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException {
                return DmaapTopicSink.factory.get(topicName);
        }
+
+       @Override
+       public NoopTopicSink getNoopTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException {
+               return NoopTopicSink.factory.get(topicName);
+       }
        
 }
index 2250b1e..c2c4779 100644 (file)
 
 package org.openecomp.policy.drools.event.comm;
 
-import org.openecomp.policy.drools.properties.Lockable;
-import org.openecomp.policy.drools.properties.Startable;
-
 /**
  * Marks a given Topic Endpoint as able to send messages over a topic
  */
-public interface TopicSink extends Topic, Startable, Lockable {
+public interface TopicSink extends Topic {
        
        /**
         * Sends a string message over this Topic Endpoint
         * 
         * @param message message to send
+        * 
         * @return true if the send operation succeeded, false otherwise
         * @throws IllegalArgumentException an invalid message has been provided
         * @throws IllegalStateException the entity is in an state that prevents
-        * it from sending messages, for example, locked or stopped.
+        *         it from sending messages, for example, locked or stopped.
         */
        public boolean send(String message) throws IllegalArgumentException, IllegalStateException;
 
index 0dfbe1c..6d1cbda 100644 (file)
 
 package org.openecomp.policy.drools.event.comm;
 
-import org.openecomp.policy.drools.properties.Lockable;
-import org.openecomp.policy.drools.properties.Startable;
-
 /**
  * Marker for a Topic Entity, indicating that the entity is able to read
  * over a topic
  */
-public interface TopicSource extends TopicRegisterable, Topic, Startable, Lockable {
+public interface TopicSource extends Topic {
        
        /**
         * pushes an event into the source programatically
@@ -37,4 +34,4 @@ public interface TopicSource extends TopicRegisterable, Topic, Startable, Lockab
         */
        public boolean offer(String event);
 
-}
+}
\ No newline at end of file
 
 package org.openecomp.policy.drools.event.comm.bus;
 
-public interface BusTopic {
+/**
+ * API
+ */
+public interface ApiKeyEnabled {
+       /**
+        * @return api key
+        */
        public String getApiKey();
+       
+       /**
+        * @return api secret
+        */
        public String getApiSecret();
 }
index 30978c2..5952533 100644 (file)
@@ -25,7 +25,7 @@ import org.openecomp.policy.drools.event.comm.TopicSink;
 /**
  * Topic Sink over Bus Infrastructure (DMAAP/UEB)
  */
-public interface BusTopicSink extends BusTopic, TopicSink {
+public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
        /**
         * Log Failures after X number of retries
         */
index e6a46d2..6796fc0 100644 (file)
@@ -26,7 +26,7 @@ import org.openecomp.policy.drools.event.comm.TopicSource;
  * Generic Topic Source for UEB/DMAAP Communication Infrastructure
  *
  */
-public interface BusTopicSource extends BusTopic, TopicSource {
+public interface BusTopicSource extends ApiKeyEnabled, TopicSource {
        
        /**
         * Default Consumer Instance Value
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSink.java
new file mode 100644 (file)
index 0000000..e9d503e
--- /dev/null
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.event.comm.bus;
+
+import java.util.List;
+
+import org.openecomp.policy.drools.event.comm.TopicSink;
+import org.openecomp.policy.drools.event.comm.bus.internal.TopicBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOOP topic sink 
+ */
+public class NoopTopicSink extends TopicBase implements TopicSink {
+       
+       /**
+        * factory
+        */
+       public static final NoopTopicSinkFactory factory = new IndexedNoopTopicSinkFactory();
+
+       /**
+        * logger
+        */
+       private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class);
+       
+       /**
+        * net logger
+        */
+       private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
+       
+       /**
+        * constructor
+        * @param servers  servers
+        * @param topic topic
+        * @throws IllegalArgumentException if an invalid argument has been passed in
+        */
+       public NoopTopicSink(List<String> servers, String topic) throws IllegalArgumentException {
+               super(servers, topic);
+       }
+
+       @Override
+       public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
+               
+               if (message == null || message.isEmpty())
+                       throw new IllegalArgumentException("Message to send is empty");
+
+               if (!this.alive)
+                       throw new IllegalStateException(this + " is stopped");
+               
+               try {
+                       synchronized (this) {
+                               this.recentEvents.add(message);
+                       }
+                       
+                       netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), 
+                                      this.topic, System.lineSeparator(), message);
+                       
+                       broadcast(message);
+               } catch (Exception e) {
+                       logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
+                       return false;
+               }
+               
+               return true;
+       }
+
+       @Override
+       public CommInfrastructure getTopicCommInfrastructure() {
+               return CommInfrastructure.NOOP;
+       }
+
+       @Override
+       public boolean start() throws IllegalStateException {
+               logger.info("{}: starting", this);
+               
+               synchronized(this) {
+                       
+                       if (this.alive)
+                               return true;
+                       
+                       if (locked)
+                               throw new IllegalStateException(this + " is locked.");
+                       
+                       this.alive = true;
+               }
+                               
+               return true;
+       }
+
+       @Override
+       public boolean stop() throws IllegalStateException {
+               synchronized(this) {
+                       this.alive = false;
+               }
+               return true;
+       }
+
+       @Override
+       public void shutdown() throws IllegalStateException {
+               this.stop();
+       }
+
+       @Override
+       public String toString() {
+               return "NoopTopicSink [toString()=" + super.toString() + "]";
+       }
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
new file mode 100644 (file)
index 0000000..788ea68
--- /dev/null
@@ -0,0 +1,226 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.properties.PolicyProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Noop Topic Sink Factory
+ */
+public interface NoopTopicSinkFactory {
+       
+       /**
+        * Creates noop topic sinks based on properties files
+        * 
+        * @param properties Properties containing initialization values
+        * 
+        * @return a noop topic sink
+        * @throws IllegalArgumentException if invalid parameters are present
+        */
+       public List<NoopTopicSink> build(Properties properties) 
+                       throws IllegalArgumentException;
+
+       /**
+        * builds a noop sink
+        *  
+        * @param servers list of servers
+        * @param topic topic name
+        * @param managed is this sink endpoint managed?
+        * @return a noop topic sink
+        * @throws IllegalArgumentException if invalid parameters are present
+        */
+       public NoopTopicSink build(List<String> servers, String topic, boolean managed) 
+                       throws IllegalArgumentException;
+       
+       /**
+        * Destroys a sink based on the topic
+        * 
+        * @param topic topic name
+        * @throws IllegalArgumentException if invalid parameters are present
+        */
+       public void destroy(String topic);
+
+       /**
+        * gets a sink based on topic name
+        * @param topic the topic name
+        * 
+        * @return a sink with topic name
+        * @throws IllegalArgumentException if an invalid topic is provided
+        * @throws IllegalStateException if the sink is in an incorrect state
+        */
+       public NoopTopicSink get(String topic)
+                          throws IllegalArgumentException, IllegalStateException;
+       
+       /**
+        * Provides a snapshot of the UEB Topic Writers
+        * @return a list of the UEB Topic Writers
+        */
+       public List<NoopTopicSink> inventory();
+
+       /**
+        * Destroys all sinks
+        */
+       public void destroy();
+}
+
+/* ------------- implementation ----------------- */
+
+/**
+ * Factory of noop sinks
+ */
+class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
+       /**
+        * Logger 
+        */
+       private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);       
+       
+       /**
+        * noop topic sinks map
+        */
+       protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<String, NoopTopicSink>();
+
+       @Override
+       public List<NoopTopicSink> build(Properties properties) throws IllegalArgumentException {
+               
+               String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
+               if (sinkTopics == null || sinkTopics.isEmpty()) {
+                       logger.info("{}: no topic for noop sink", this);
+                       return new ArrayList<NoopTopicSink>();
+               }
+               
+               List<String> sinkTopicList = new ArrayList<String>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
+               List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>();
+               synchronized(this) {
+                       for (String topic: sinkTopicList) {
+                               if (this.noopTopicSinks.containsKey(topic)) {
+                                       newSinks.add(this.noopTopicSinks.get(topic));
+                                       continue;
+                               }
+                               
+                               String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + 
+                                                                       topic + 
+                                                                       PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+                               
+                               if (servers == null || servers.isEmpty()) 
+                                       servers = "noop";
+                               
+                               List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+                               
+                               String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
+                                                                                     PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+                               boolean managed = true;
+                               if (managedString != null && !managedString.isEmpty()) {
+                                       managed = Boolean.parseBoolean(managedString);
+                               }                       
+                               
+                               NoopTopicSink noopSink = this.build(serverList, topic, managed);
+                               newSinks.add(noopSink);
+                       }
+                       return newSinks;
+               }
+       }
+
+       @Override
+       public NoopTopicSink build(List<String> servers, String topic, boolean managed) throws IllegalArgumentException {
+               if (servers == null) {
+                       servers = new ArrayList<>();
+               }
+               
+               if (servers.isEmpty()) {
+                       servers.add("noop");
+               }
+
+               if (topic == null || topic.isEmpty()) {
+                       throw new IllegalArgumentException("A topic must be provided");
+               }
+               
+               synchronized (this) {
+                       if (noopTopicSinks.containsKey(topic)) {
+                               return noopTopicSinks.get(topic);
+                       }
+                       
+                       NoopTopicSink sink = 
+                                       new NoopTopicSink(servers, topic);
+                       
+                       if (managed)
+                               noopTopicSinks.put(topic, sink);
+                       
+                       return sink;
+               }
+       }
+
+       @Override
+       public void destroy(String topic) {             
+               if (topic == null || topic.isEmpty()) {
+                       throw new IllegalArgumentException("A topic must be provided");
+               }
+               
+               NoopTopicSink noopSink;
+               synchronized(this) {
+                       if (!noopTopicSinks.containsKey(topic)) {
+                               return;
+                       }
+                       
+                       noopSink = noopTopicSinks.remove(topic);
+               }
+               
+               noopSink.shutdown();
+       }
+       
+       @Override
+       public void destroy() {
+               List<NoopTopicSink> sinks = this.inventory();
+               for (NoopTopicSink sink: sinks) {
+                       sink.shutdown();
+               }
+               
+               synchronized(this) {
+                       this.noopTopicSinks.clear();
+               }
+       }
+
+       @Override
+       public NoopTopicSink get(String topic) throws IllegalArgumentException, IllegalStateException {
+               if (topic == null || topic.isEmpty()) {
+                       throw new IllegalArgumentException("A topic must be provided");
+               }
+               
+               synchronized(this) {
+                       if (noopTopicSinks.containsKey(topic)) {
+                               return noopTopicSinks.get(topic);
+                       } else {
+                               throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
+                       }
+               }
+       }
+
+       @Override
+       public List<NoopTopicSink> inventory() {
+                return new ArrayList<>(this.noopTopicSinks.values());
+       }
+}
index efa4dc5..3966d1f 100644 (file)
@@ -26,7 +26,7 @@ package org.openecomp.policy.drools.event.comm.bus;
 public interface UebTopicSink extends BusTopicSink {
        
        /**
-        * Factory of UebTopicWriter for instantiation and management purposes
+        * Factory of UEB Topic Sinks for instantiation and management purposes
         */
        public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory();
 }
index 0469c4a..58d5dff 100644 (file)
@@ -46,7 +46,7 @@ public interface UebTopicSinkFactory {
         * @param partitionKey Consumer Group
         * @param managed is this sink endpoint managed?
         * 
-        * @return an UEB Topic Writer
+        * @return an UEB Topic Sink
         * @throws IllegalArgumentException if invalid parameters are present
         */
        public UebTopicSink build(List<String> servers, 
index 4ac1c6f..4beaa1f 100644 (file)
@@ -22,22 +22,32 @@ package org.openecomp.policy.drools.event.comm.bus.internal;
 
 import java.util.List;
 
-import org.apache.commons.collections4.queue.CircularFifoQueue;
-import org.openecomp.policy.drools.event.comm.Topic;
-import org.openecomp.policy.drools.event.comm.bus.BusTopic;
+import org.openecomp.policy.drools.event.comm.bus.ApiKeyEnabled;
 
-public abstract class BusTopicBase implements BusTopic, Topic {
-       
-       protected List<String> servers;
-
-       protected String topic;
+/**
+ * Bus Topic Base
+ */
+public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
        
+       /**
+        * API Key
+        */
        protected String apiKey;
+       
+       /**
+        * API Secret
+        */
        protected String apiSecret;
+       
+       /**
+        * Use https
+        */
        protected boolean useHttps;
-       protected boolean allowSelfSignedCerts;
        
-       protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10);
+       /**
+        * allow self signed certificates
+        */
+       protected boolean allowSelfSignedCerts;
        
        /**
         * Instantiates a new Bus Topic Base
@@ -60,16 +70,7 @@ public abstract class BusTopicBase implements BusTopic, Topic {
                                                  boolean allowSelfSignedCerts) 
        throws IllegalArgumentException {
                
-               if (servers == null || servers.isEmpty()) {
-                       throw new IllegalArgumentException("UEB Server(s) must be provided");
-               }
-               
-               if (topic == null || topic.isEmpty()) {
-                       throw new IllegalArgumentException("An UEB Topic must be provided");
-               }
-               
-               this.servers = servers;
-               this.topic = topic;
+               super(servers, topic);
                
                this.apiKey = apiKey;
                this.apiSecret = apiSecret;
@@ -77,73 +78,35 @@ public abstract class BusTopicBase implements BusTopic, Topic {
                this.allowSelfSignedCerts = allowSelfSignedCerts;
        }
        
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public String getTopic() {
-               return topic;
-       }
-       
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public List<String> getServers() {
-               return servers;
-       }
-       
-       /**
-        * {@inheritDoc}
-        */
        @Override
        public String getApiKey() {
                return apiKey;
        }
 
-       /**
-        * {@inheritDoc}
-        */
        @Override
        public String getApiSecret() {
                return apiSecret;
        }
        
        /**
-        * @return useHttps
+        * @return if using https
         */
        public boolean isUseHttps(){
                return useHttps;
        }
 
        /**
-        * @return allowSelfSignedCerts
+        * @return if self signed certificates are allowed
         */
        public boolean isAllowSelfSignedCerts(){
                return allowSelfSignedCerts;
        }
-       
-       /**
-        * @return the recentEvents
-        */
-       @Override
-       public synchronized String[] getRecentEvents() {
-               String[] events = new String[recentEvents.size()];
-               return recentEvents.toArray(events);
-       }
 
 
        @Override
        public String toString() {
-               StringBuilder builder = new StringBuilder();
-               builder.append("UebTopicBase [servers=").append(servers)
-                       .append(", topic=").append(topic)
-                       .append(", apiKey=").append(apiKey)
-                       .append(", apiSecret=").append(apiSecret)
-                       .append(", useHttps=").append(useHttps)
-                       .append(", allowSelfSignedCerts=").append(allowSelfSignedCerts)
-                       .append("]");
-               return builder.toString();
+               return "BusTopicBase [apiKey=" + apiKey + ", apiSecret=" + apiSecret + ", useHttps=" + useHttps
+                               + ", allowSelfSignedCerts=" + allowSelfSignedCerts + ", toString()=" + super.toString() + "]";
        }
 
 }
index 6403774..3f1f361 100644 (file)
@@ -45,23 +45,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
         */
        protected String partitionId;
        
-       /**
-        * Am I running?
-        * reflects invocation of start()/stop() 
-        * !locked & start() => alive
-        * stop() => !alive
-        */
-       protected volatile boolean alive = false;
-       
-       /**
-        * Am I locked?
-        * reflects invocation of lock()/unlock() operations
-        * locked => !alive (but not in the other direction necessarily)
-        * locked => !offer, !run, !start, !stop (but this last one is obvious
-        *                                        since locked => !alive)
-        */
-       protected volatile boolean locked = false;
-       
        /**
         * message bus publisher
         */
@@ -99,8 +82,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
         * {@inheritDoc}
         */
        @Override
-       public boolean start() throws IllegalStateException {
-               
+       public boolean start() throws IllegalStateException {           
                logger.info("{}: starting", this);
                
                synchronized(this) {
@@ -144,64 +126,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
                }
                
                return true;
-       }
-       
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean lock() {
-               
-               logger.info("{}: locking", this);       
-               
-               synchronized (this) {
-                       if (this.locked)
-                               return true;
-                       
-                       this.locked = true;
-               }
-               
-               return this.stop();
-       }
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean unlock() {
-               
-               logger.info("{}: unlocking", this);
-               
-               synchronized(this) {
-                       if (!this.locked)
-                               return true;
-                       
-                       this.locked = false;
-               }
-               
-               try {
-                       return this.start();
-               } catch (Exception e) {
-                       logger.warn("{}: cannot start after unlocking because of {}", 
-                                           this, e.getMessage(), e);
-                       return false;
-               }
-       }
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean isLocked() {
-               return this.locked;
-       }       
-       
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean isAlive() {
-               return this.alive;
        }       
        
        /**
@@ -226,7 +150,8 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
                        netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), 
                                       this.topic, System.lineSeparator(), message);
                        
-                       publisher.send(this.partitionId, message);
+                       publisher.send(this.partitionId, message);                      
+                       broadcast(message);
                } catch (Exception e) {
                        logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
                        return false;
@@ -260,10 +185,12 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
                this.stop();
        }
        
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public abstract CommInfrastructure getTopicCommInfrastructure();
 
+       @Override
+       public String toString() {
+               StringBuilder builder = new StringBuilder();
+               builder.append("InlineBusTopicSink [partitionId=").append(partitionId).append(", alive=").append(alive)
+                               .append(", publisher=").append(publisher).append("]");
+               return builder.toString();
+       }
 }
index a78dd0f..37051e2 100644 (file)
@@ -91,8 +91,6 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
                this.longitude = longitude;
                
                this.additionalProps = additionalProps;
-               
-               this.init();
        }
        
        public InlineDmaapTopicSink(List<String> servers, String topic, 
index 3b091f5..d657f06 100644 (file)
@@ -25,6 +25,7 @@ import java.util.List;
 import org.openecomp.policy.drools.event.comm.Topic;
 import org.openecomp.policy.drools.event.comm.bus.UebTopicSink;
 import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
 
 /**
  * This implementation publishes events for the associated UEB topic,
@@ -35,8 +36,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
        /**
         * logger 
         */
-       private static org.slf4j.Logger logger = 
-                                                                               LoggerFactory.getLogger(InlineUebTopicSink.class);
+       private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
        
        /**
         * Argument-based UEB Topic Writer instantiation
index 85da3f0..0fa90b5 100644 (file)
@@ -78,15 +78,6 @@ public abstract class SingleThreadedBusTopicSource
         */
        protected volatile boolean alive = false;
        
-       /**
-        * Am I locked?
-        * reflects invocation of lock()/unlock() operations
-        * locked => !alive (but not in the other direction necessarily)
-        * locked => !offer, !run, !start, !stop (but this last one is obvious
-        *                                        since locked => !alive)
-        */
-       protected volatile boolean locked = false;
-       
        /**
         * Independent thread reading message over my topic
         */
@@ -157,52 +148,28 @@ public abstract class SingleThreadedBusTopicSource
         */
        public abstract void init() throws Exception;
        
-       /**
-        * {@inheritDoc}
-        */
        @Override
        public void register(TopicListener topicListener) 
                throws IllegalArgumentException {               
                
-               logger.info("{}: registering {}", this, topicListener);
-               
-               synchronized(this) {
-                       if (topicListener == null)
-                               throw new IllegalArgumentException("TopicListener must be provided");
-                       
-                       /* check that this listener is not registered already */
-                       for (TopicListener listener: this.topicListeners) {
-                               if (listener == topicListener) {
-                                       // already registered
-                                       return;
-                               }
-                       }
-                       
-                       this.topicListeners.add(topicListener);
-               }
+               super.register(topicListener);
                
                try {
-                       this.start();
+                       if (!alive && !locked)
+                               this.start();
+                       else
+                               logger.info("{}: register: start not attempted", this);
                } catch (Exception e) {
                        logger.warn("{}: cannot start after registration of because of: {}",
                                this, topicListener, e.getMessage(), e);
                }
        }
 
-       /**
-        * {@inheritDoc}
-        */
        @Override
        public void unregister(TopicListener topicListener) {
-               
-               logger.info("{}: unregistering {}", this, topicListener);
-               
                boolean stop = false;
                synchronized (this) {
-                       if (topicListener == null)
-                               throw new IllegalArgumentException("TopicListener must be provided");
-                       
-                       this.topicListeners.remove(topicListener);
+                       super.unregister(topicListener);
                        stop = (this.topicListeners.isEmpty());
                }
                
@@ -211,49 +178,6 @@ public abstract class SingleThreadedBusTopicSource
                }
        }
        
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean lock() { 
-
-               logger.info("{}: locking", this);
-               
-               synchronized (this) {
-                       if (this.locked)
-                               return true;
-                       
-                       this.locked = true;
-               }
-               
-               return this.stop();
-       }
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean unlock() {               
-               logger.info("{}: unlocking", this);
-               
-               synchronized(this) {
-                       if (!this.locked)
-                               return true;
-                       
-                       this.locked = false;
-               }
-               
-               try {
-                       return this.start();
-               } catch (Exception e) {
-                       logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
-                       return false;
-               }
-       }
-       
-       /**
-        * {@inheritDoc}
-        */
        @Override
        public boolean start() throws IllegalStateException {           
                logger.info("{}: starting", this);
@@ -286,9 +210,6 @@ public abstract class SingleThreadedBusTopicSource
                return this.alive;
        }
 
-       /**
-        * {@inheritDoc}
-        */
        @Override
        public boolean stop() {
                logger.info("{}: stopping", this);
@@ -312,49 +233,6 @@ public abstract class SingleThreadedBusTopicSource
                                
                return true;
        }
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean isLocked() {
-               return this.locked;
-       }
-       
-       /**
-        * broadcast event to all listeners
-        * 
-        * @param message the event
-        * @return true if all notifications are performed with no error, false otherwise
-        */
-       protected boolean broadcast(String message) {
-               
-               /* take a snapshot of listeners */
-               List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
-               
-               boolean success = true;
-               for (TopicListener topicListener: snapshotListeners) {
-                       try {
-                               topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
-                       } catch (Exception e) {
-                               logger.warn("{}: notification error @ {} because of {}", 
-                                                   this, topicListener, e.getMessage(), e);
-                               success = false;
-                       }
-               }
-               return success;
-       }
-       
-       /**
-        * take a snapshot of current topic listeners
-        * 
-        * @return the topic listeners
-        */
-       protected synchronized List<TopicListener> snapshotTopicListeners() {
-               @SuppressWarnings("unchecked")
-               List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
-               return listeners;
-       }
        
        /**
         * Run thread method for the Bus Reader
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/TopicBase.java
new file mode 100644 (file)
index 0000000..ff3d466
--- /dev/null
@@ -0,0 +1,229 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.TopicListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class TopicBase implements Topic {
+       
+       /**
+        * logger 
+        */
+       private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
+       
+       /**
+        * list of servers
+        */
+       protected List<String> servers;
+
+       /**
+        * Topic
+        */
+       protected String topic;
+       
+       /**
+        * event cache
+        */
+       protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10);
+               
+       /**
+        * Am I running?
+        * reflects invocation of start()/stop() 
+        * !locked & start() => alive
+        * stop() => !alive
+        */
+       protected volatile boolean alive = false;
+       
+       /**
+        * Am I locked?
+        * reflects invocation of lock()/unlock() operations
+        * locked => !alive (but not in the other direction necessarily)
+        * locked => !offer, !run, !start, !stop (but this last one is obvious
+        *                                        since locked => !alive)
+        */
+       protected volatile boolean locked = false;
+       
+       /**
+        * All my subscribers for new message notifications
+        */
+       protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
+       
+       /**
+        * Instantiates a new Topic Base
+        * 
+        * @param servers list of servers
+        * @param topic topic name
+        *  
+        * @return a Topic Base
+        * @throws IllegalArgumentException if invalid parameters are present
+        */
+       public TopicBase(List<String> servers, String topic) throws IllegalArgumentException {
+               
+               if (servers == null || servers.isEmpty()) {
+                       throw new IllegalArgumentException("Server(s) must be provided");
+               }
+               
+               if (topic == null || topic.isEmpty()) {
+                       throw new IllegalArgumentException("A Topic must be provided");
+               }
+               
+               this.servers = servers;
+               this.topic = topic;
+       }
+       
+       @Override
+       public void register(TopicListener topicListener) 
+               throws IllegalArgumentException {               
+               
+               logger.info("{}: registering {}", this, topicListener);
+               
+               synchronized(this) {
+                       if (topicListener == null)
+                               throw new IllegalArgumentException("TopicListener must be provided");
+                       
+                       for (TopicListener listener: this.topicListeners) {
+                               if (listener == topicListener) return;
+                       }
+                       
+                       this.topicListeners.add(topicListener);
+               }
+       }
+
+       @Override
+       public void unregister(TopicListener topicListener) {
+               
+               logger.info("{}: unregistering {}", this, topicListener);
+               
+               synchronized (this) {
+                       if (topicListener == null)
+                               throw new IllegalArgumentException("TopicListener must be provided");
+                       
+                       this.topicListeners.remove(topicListener);
+               }
+       }
+       
+       /**
+        * broadcast event to all listeners
+        * 
+        * @param message the event
+        * @return true if all notifications are performed with no error, false otherwise
+        */
+       protected boolean broadcast(String message) {
+               List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
+               
+               boolean success = true;
+               for (TopicListener topicListener: snapshotListeners) {
+                       try {
+                               topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
+                       } catch (Exception e) {
+                               logger.warn("{}: notification error @ {} because of {}", 
+                                                   this, topicListener, e.getMessage(), e);
+                               success = false;
+                       }
+               }
+               return success;
+       }
+       
+       /**
+        * take a snapshot of current topic listeners
+        * 
+        * @return the topic listeners
+        */
+       protected synchronized List<TopicListener> snapshotTopicListeners() {
+               @SuppressWarnings("unchecked")
+               List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
+               return listeners;
+       }
+       
+       @Override
+       public boolean lock() { 
+
+               logger.info("{}: locking", this);
+               
+               synchronized (this) {
+                       if (this.locked)
+                               return true;
+                       
+                       this.locked = true;
+               }
+               
+               return this.stop();
+       }
+
+       @Override
+       public boolean unlock() {               
+               logger.info("{}: unlocking", this);
+               
+               synchronized(this) {
+                       if (!this.locked)
+                               return true;
+                       
+                       this.locked = false;
+               }
+               
+               try {
+                       return this.start();
+               } catch (Exception e) {
+                       logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
+                       return false;
+               }
+       }
+       
+       @Override
+       public boolean isLocked() {
+               return this.locked;
+       }       
+       
+       @Override
+       public String getTopic() {
+               return topic;
+       }
+       
+       @Override
+       public boolean isAlive() {
+               return this.alive;
+       }
+       
+       @Override
+       public List<String> getServers() {
+               return servers;
+       }
+
+       @Override
+       public synchronized String[] getRecentEvents() {
+               String[] events = new String[recentEvents.size()];
+               return recentEvents.toArray(events);
+       }
+       
+       
+       @Override
+       public String toString() {
+               return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size() + ", locked="
+                               + locked + ", #topicListeners=" + topicListeners.size() + "]";
+       }
+}
index 0204700..6be9632 100644 (file)
@@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory;
  */
 public class HttpServerTest {
        
+       /**
+        * Logger
+        */
        private static Logger logger = LoggerFactory.getLogger(HttpServerTest.class);
 
        @Test
diff --git a/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/NoopTopicTest.java b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/NoopTopicTest.java
new file mode 100644 (file)
index 0000000..924c2a2
--- /dev/null
@@ -0,0 +1,117 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.http.server.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.openecomp.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.openecomp.policy.drools.event.comm.TopicEndpoint;
+import org.openecomp.policy.drools.event.comm.TopicListener;
+import org.openecomp.policy.drools.event.comm.TopicSink;
+import org.openecomp.policy.drools.event.comm.bus.NoopTopicSink;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOOP Endpoint Tests
+ */
+public class NoopTopicTest implements TopicListener {
+       
+       /**
+        * Logger
+        */
+       private static Logger logger = LoggerFactory.getLogger(NoopTopicTest.class);
+       
+       private final String topicName = "junit-noop";
+       private final String outMessage = "blah";
+       private String inMessage = null;
+       
+       @Test
+       public void testNoopEndpoint() {
+               logger.info("-- testNoopEndpoint() --");
+               
+               Properties noopSinkProperties = new Properties();
+               noopSinkProperties.put(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS, topicName);
+               
+               List<? extends TopicSink> noopTopics = 
+                               TopicEndpoint.manager.addTopicSinks(noopSinkProperties);
+               
+               TopicSink sink = NoopTopicSink.factory.get(topicName);
+               
+               assertTrue(noopTopics.size() == 1);     
+               assertTrue(noopTopics.size() == NoopTopicSink.factory.inventory().size());      
+               assertTrue(noopTopics.get(0) == sink);
+               assertTrue(sink == NoopTopicSink.factory.inventory().get(0));
+               
+               assertTrue(!sink.isAlive());
+               
+               boolean badState = false;
+               try{ 
+                       sink.send(outMessage);
+               } catch(IllegalStateException e) {
+                       badState = true;
+               }               
+               assertTrue(badState);
+               
+               sink.start();
+               assertTrue(sink.isAlive());
+               
+               sink.send(outMessage);
+               assertTrue(sink.getRecentEvents().length == 1);
+               assertTrue(sink.getRecentEvents()[0].equals(outMessage));
+               assertTrue(this.inMessage ==  null);
+               
+               sink.register(this);
+               sink.send(this.outMessage);
+               assertTrue(outMessage.equals(this.inMessage));
+               this.inMessage = null;
+               
+               sink.unregister(this);
+               sink.send(this.outMessage);
+               assertTrue(!outMessage.equals(this.inMessage));
+               
+               sink.stop();
+               try{ 
+                       sink.send(outMessage);
+               } catch(IllegalStateException e) {
+                       badState = true;
+               }               
+               assertTrue(badState);
+               
+               NoopTopicSink.factory.destroy(topicName);
+               assertTrue(NoopTopicSink.factory.inventory().size() == 0);                      
+       }
+
+       @Override
+       public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
+               if (commType != CommInfrastructure.NOOP)
+                       return;
+               
+               if (topic == null || !topic.equals(topicName))
+                       return;
+               
+               this.inMessage = event;
+       }
+}
index 276573e..625eef5 100644 (file)
@@ -241,6 +241,11 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
                                } else {
                                        PROPERTY_TOPIC_ENTITY_PREFIX = PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
                                }
+                       } else if (commInfra == CommInfrastructure.NOOP) {
+                               if (!isSource)
+                                       PROPERTY_TOPIC_ENTITY_PREFIX = PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
+                               else
+                                       continue;
                        } else {
                                throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
                        }