--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.event.comm;
+
+/**
+ * TopicSource that supports server-side filtering.
+ */
+public interface FilterableTopicSource extends TopicSource {
+
+ /**
+ * Sets the server-side filter.
+ *
+ * @param filter new filter value, or {@code null}
+ * @throws UnsupportedOperationException if the consumer does not support
+ * server-side filtering
+ * @throws IllegalArgumentException if the consumer cannot be built with the
+ * new filter
+ */
+ public void setFilter(String filter);
+
+}
*/
public void close();
+ /**
+ * BusConsumer that supports server-side filtering.
+ */
+ public interface FilterableBusConsumer extends BusConsumer {
+
+ /**
+ * Sets the server-side filter.
+ *
+ * @param filter new filter value, or {@code null}
+ * @throws IllegalArgumentException if the consumer cannot be built with
+ * the new filter
+ */
+ public void setFilter(String filter);
+ }
+
/**
* Cambria based consumer
*/
- public static class CambriaConsumerWrapper implements BusConsumer {
+ public static class CambriaConsumerWrapper implements FilterableBusConsumer {
/**
* logger
*/
private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
+
+ /**
+ * Used to build the consumer.
+ */
+ private final ConsumerBuilder builder;
/**
* Cambria client
*/
- protected CambriaConsumer consumer;
+ protected volatile CambriaConsumer consumer;
/**
* fetch timeout
this.fetchTimeout = fetchTimeout;
- final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+ this.builder = new CambriaClientBuilders.ConsumerBuilder();
if (useHttps) {
this.consumer.close();
}
+ @Override
+ public void setFilter(String filter) {
+ logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
+ builder.withServerSideFilter(filter);
+
+ try {
+ consumer = builder.build();
+
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import org.onap.policy.drools.event.comm.FilterableTopicSource;
import org.onap.policy.drools.event.comm.TopicListener;
import org.onap.policy.drools.event.comm.bus.BusTopicSource;
+import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
/**
* This topic source implementation specializes in reading messages
*/
public abstract class SingleThreadedBusTopicSource
extends BusTopicBase
- implements Runnable, BusTopicSource {
+ implements Runnable, BusTopicSource, FilterableTopicSource {
/**
* Not to be converted to PolicyLogger.
@Override
+ public void setFilter(String filter) {
+ if(consumer instanceof FilterableBusConsumer) {
+ ((FilterableBusConsumer) consumer).setFilter(filter);
+
+ } else {
+ throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
+ }
+ }
+
+ @Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)