Set default consumer instance to hostname
[policy/drools-pdp.git] / policy-endpoints / src / main / java / org / onap / policy / drools / event / comm / bus / internal / SingleThreadedBusTopicSource.java
index b7df8ca..5e8cf48 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,9 +27,11 @@ import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.onap.policy.drools.event.comm.FilterableTopicSource;
 import org.onap.policy.drools.event.comm.TopicListener;
 import org.onap.policy.drools.event.comm.bus.BusTopicSource;
+import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
+import org.onap.policy.drools.utils.NetworkUtil;
 
 /**
  * This topic source implementation specializes in reading messages
@@ -37,7 +39,7 @@ import org.onap.policy.drools.event.comm.bus.BusTopicSource;
  */
 public abstract class SingleThreadedBusTopicSource 
        extends BusTopicBase
-       implements Runnable, BusTopicSource {
+       implements Runnable, BusTopicSource, FilterableTopicSource {
           
        /**
         * Not to be converted to PolicyLogger.
@@ -125,7 +127,7 @@ public abstract class SingleThreadedBusTopicSource
                }
                
                if (consumerInstance == null || consumerInstance.isEmpty()) {
-                       this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
+            this.consumerInstance = NetworkUtil.getHostname();
                } else {
                        this.consumerInstance = consumerInstance;
                }
@@ -286,6 +288,16 @@ public abstract class SingleThreadedBusTopicSource
        
 
        @Override
+    public void setFilter(String filter) {
+           if(consumer instanceof FilterableBusConsumer) {
+               ((FilterableBusConsumer) consumer).setFilter(filter);
+               
+           } else {
+               throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
+           }
+    }
+
+    @Override
        public String toString() {
                StringBuilder builder = new StringBuilder();
                builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)