2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.topic;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
27 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
28 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
29 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * A pair of topics, one of which is used to publish requests and the other to receive
39 public class TopicPair extends TopicListenerImpl {
40 private static final Logger logger = LoggerFactory.getLogger(TopicPair.class);
43 private final String source;
46 private final String target;
48 private final List<TopicSink> publishers;
49 private final List<TopicSource> subscribers;
52 * Constructs the object.
54 * @param source source topic name
55 * @param target target topic name
57 public TopicPair(String source, String target) {
61 publishers = getTopicEndpointManager().getTopicSinks(target);
62 if (publishers.isEmpty()) {
63 throw new IllegalArgumentException("no sinks for topic: " + target);
66 subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source));
67 if (subscribers.isEmpty()) {
68 throw new IllegalArgumentException("no sources for topic: " + source);
73 * Starts listening on the source topic(s).
76 subscribers.forEach(topic -> topic.register(this));
80 * Stops listening on the source topic(s).
83 subscribers.forEach(topic -> topic.unregister(this));
87 * Stops listening on the source topic(s) and clears all of the forwarders.
90 public void shutdown() {
96 * Publishes a message to the target topic.
98 * @param message message to be published
99 * @return a list of the infrastructures on which it was published
101 public List<CommInfrastructure> publish(String message) {
102 List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size());
104 for (TopicSink topic : publishers) {
107 infrastructures.add(topic.getTopicCommInfrastructure());
109 } catch (RuntimeException e) {
110 logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e);
114 return infrastructures;
117 // these may be overridden by junit tests
119 protected TopicEndpoint getTopicEndpointManager() {
120 return TopicEndpointManager.getManager();