Remove server-side filtering from policy-endpoints 02/114502/1
authorJim Hahn <jrh3@att.com>
Thu, 29 Oct 2020 20:07:19 +0000 (16:07 -0400)
committerJim Hahn <jrh3@att.com>
Thu, 29 Oct 2020 20:08:23 +0000 (16:08 -0400)
ONAP DMaaP Message Router no longer supports server-side filtering.
Removed it from policy-endpoints.

Issue-ID: POLICY-2881
Change-Id: I08157f7699608af63992dec78a61c5f9c55037b9
Signed-off-by: Jim Hahn <jrh3@att.com>
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java [deleted file]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java

diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java
deleted file mode 100644 (file)
index 27f4ceb..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm;
-
-/**
- * TopicSource that supports server-side filtering.
- */
-public interface FilterableTopicSource extends TopicSource {
-
-    /**
-     * Sets the server-side filter.
-     * 
-     * @param filter new filter value, or {@code null}
-     * @throws UnsupportedOperationException if the consumer does not support
-     *         server-side filtering
-     * @throws IllegalArgumentException if the consumer cannot be built with the
-     *         new filter
-     */
-    public void setFilter(String filter);
-
-}
index 233434f..60ab2e9 100644 (file)
@@ -60,24 +60,10 @@ public interface BusConsumer {
      */
     public void close();
 
-    /**
-     * BusConsumer that supports server-side filtering.
-     */
-    public interface FilterableBusConsumer extends BusConsumer {
-
-        /**
-         * Sets the server-side filter.
-         *
-         * @param filter new filter value, or {@code null}
-         * @throws IllegalArgumentException if the consumer cannot be built with the new filter
-         */
-        public void setFilter(String filter);
-    }
-
     /**
      * Cambria based consumer.
      */
-    public static class CambriaConsumerWrapper implements FilterableBusConsumer {
+    public static class CambriaConsumerWrapper implements BusConsumer {
 
         /**
          * logger.
@@ -89,20 +75,10 @@ public interface BusConsumer {
          */
         private final ConsumerBuilder builder;
 
-        /**
-         * Locked while updating {@link #consumer} and {@link #newConsumer}.
-         */
-        private final Object consLocker = new Object();
-
         /**
          * Cambria client.
          */
-        private CambriaConsumer consumer;
-
-        /**
-         * Cambria client to use for next fetch.
-         */
-        private CambriaConsumer newConsumer = null;
+        private final CambriaConsumer consumer;
 
         /**
          * fetch timeout.
@@ -169,7 +145,7 @@ public interface BusConsumer {
         @Override
         public Iterable<String> fetch() throws IOException {
             try {
-                return getCurrentConsumer().fetch();
+                return this.consumer.fetch();
             } catch (final IOException e) { //NOSONAR
                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
                         this.fetchTimeout);
@@ -191,55 +167,7 @@ public interface BusConsumer {
         @Override
         public void close() {
             this.closeCondition.countDown();
-            getCurrentConsumer().close();
-        }
-
-        private CambriaConsumer getCurrentConsumer() {
-            CambriaConsumer old = null;
-            CambriaConsumer ret;
-
-            synchronized (consLocker) {
-                if (this.newConsumer != null) {
-                    // replace old consumer with new consumer
-                    old = this.consumer;
-                    this.consumer = this.newConsumer;
-                    this.newConsumer = null;
-                }
-
-                ret = this.consumer;
-            }
-
-            if (old != null) {
-                old.close();
-            }
-
-            return ret;
-        }
-
-        @Override
-        public void setFilter(String filter) {
-            logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
-            builder.withServerSideFilter(filter);
-
-            try {
-                CambriaConsumer previous;
-                synchronized (consLocker) {
-                    previous = this.newConsumer;
-                    this.newConsumer = builder.build();
-                }
-
-                if (previous != null) {
-                    // there was already a new consumer - close it
-                    previous.close();
-                }
-
-            } catch (MalformedURLException | GeneralSecurityException e) {
-                /*
-                 * Since an exception occurred, "consumer" still has its old value, thus it should
-                 * not be closed at this point.
-                 */
-                throw new IllegalArgumentException(e);
-            }
+            this.consumer.close();
         }
 
         @Override
index e52204f..6e74694 100644 (file)
@@ -24,10 +24,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.UUID;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
@@ -40,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * notifying its listeners.
  */
 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
-        implements Runnable, BusTopicSource, FilterableTopicSource {
+        implements Runnable, BusTopicSource {
 
     /**
      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
@@ -262,17 +260,6 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
         return broadcast(event);
     }
 
-
-    @Override
-    public void setFilter(String filter) {
-        if (consumer instanceof FilterableBusConsumer) {
-            ((FilterableBusConsumer) consumer).setFilter(filter);
-
-        } else {
-            throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
-        }
-    }
-
     @Override
     public String toString() {
         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
index 5264b2f..82d5eef 100644 (file)
@@ -100,23 +100,7 @@ public class BusConsumerTest extends TopicTestBase {
     @Test
     public void testCambriaConsumerWrapperClose() {
         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
-
-        // set filter several times to cause different branches of close() to be executed
-        for (int count = 0; count < 3; ++count) {
-            cons.close();
-            final int count2 = count;
-            assertThatCode(() -> cons.setFilter("close=" + count2)).doesNotThrowAnyException();
-        }
-    }
-
-    @Test
-    public void testCambriaConsumerWrapperSetFilter() {
-        // set filter several times to cause different branches to be executed
-        CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
-        for (int count = 0; count < 3; ++count) {
-            final int count2 = count;
-            assertThatCode(() -> cons.setFilter("set-filter=" + count2)).doesNotThrowAnyException();
-        }
+        assertThatCode(() -> cons.close()).doesNotThrowAnyException();
     }
 
     @Test
index 1e95924..1a5506d 100644 (file)
@@ -43,7 +43,6 @@ import org.mockito.stubbing.Answer;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
 import org.onap.policy.common.utils.gson.GsonTestUtils;
 
 public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
@@ -286,22 +285,6 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
         source.offer(MY_MESSAGE);
     }
 
-    @Test
-    public void testSetFilter() {
-        FilterableBusConsumer filt = mock(FilterableBusConsumer.class);
-        cons = filt;
-
-        source.start();
-        source.setFilter("my-filter");
-        verify(filt).setFilter("my-filter");
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testSetFilter_Unsupported() {
-        source.start();
-        source.setFilter("unsupported-filter");
-    }
-
     @Test
     public void testGetConsumerGroup() {
         assertEquals(MY_CONS_GROUP, source.getConsumerGroup());