* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
* Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
*/
private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
- /**
- * Queue for the external "DMaaP" topic.
- */
- private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
/**
* Counts the number of decode errors.
*/
}
/**
- * Offers an event to the external topic.
+ * Offers an event to the external topic. As each host needs a copy, it is posted
+ * to each Host's queue.
*
* @param event event
*/
public void offerExternal(String event) {
- externalTopic.offer(event);
+ for (Host host : hosts) {
+ host.getExternalTopic().offer(event);
+ }
}
/**
channel2queue.values().forEach(queue -> queue.offer(message));
}
- /**
- * Offers amessage to an internal channel.
- *
- * @param channel channel
- * @param message message
- */
-
- public void offerInternal(String channel, String message) {
- BlockingQueue<String> queue = channel2queue.get(channel);
- if (queue != null) {
- queue.offer(message);
- }
- }
-
/**
* Associates a controller with its drools controller.
*
return drools2policy.get(droolsController);
}
- /**
- * Constructor.
- *
- * @return queue for the external topic
- */
-
- public BlockingQueue<String> getExternalTopic() {
- return externalTopic;
- }
-
/**
* Get decode errors.
*
private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
+ /**
+ * Queue for the external "DMaaP" topic.
+ */
+ @Getter
+ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
/**
* Source that reads from the external topic and posts to the listener.
*/
doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
context.addController(controller, drools);
// arrange to read from the external topic
- externalSource = new TopicSourceImpl(context, false);
+ externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
feature = new PoolingFeatureImpl(context);
}
private static class TopicSinkImpl extends TopicImpl implements TopicSink {
private final Context context;
- /**
- * Used to decode the messages so that the channel can be extracted.
- */
- private final Serializer serializer = new Serializer();
/**
* Constructor.
return false;
}
try {
- Message msg = serializer.decodeMsg(message);
- String channel = msg.getChannel();
- if (Message.ADMIN.equals(channel)) {
- // add to every queue
- context.offerInternal(message);
- } else {
- // add to a specific queue
- context.offerInternal(channel, message);
- }
+ context.offerInternal(message);
return true;
} catch (JsonParseException e) {
logger.warn("could not decode message: {}", message);
* Source implementation that reads from a queue associated with a topic.
*/
- private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
+ private static class TopicSourceImpl extends TopicImpl implements TopicSource {
private final String topic;
/**
/**
* Constructor.
*
- * @param context context
- * @param internal {@code true} if to read from the internal topic, {@code false}
- * to read from the external topic
+ * @param type topic type
+ * @param queue topic from which to read
*/
- public TopicSourceImpl(Context context, boolean internal) {
- if (internal) {
- this.topic = INTERNAL_TOPIC;
- this.queue = context.getCurrentHost().getInternalQueue();
- } else {
- this.topic = EXTERNAL_TOPIC;
- this.queue = context.getExternalTopic();
- }
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("topic filter set to: {}", filter);
+ public TopicSourceImpl(String type, BlockingQueue<String> queue) {
+ this.topic = type;
+ this.queue = queue;
}
@Override
@Override
protected List<TopicSource> getTopicSources() {
- return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
+ return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
+ currentContext.get().getCurrentHost().getInternalQueue()));
}
@Override