c0cfe25710e7675a66414820b8b8efa5ece3ba58
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / topic / TopicPair.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.topic;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import lombok.Getter;
27 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
28 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
29 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * A pair of topics, one of which is used to publish requests and the other to receive
37  * responses.
38  */
39 public class TopicPair extends TopicListenerImpl {
40     private static final Logger logger = LoggerFactory.getLogger(TopicPair.class);
41
42     @Getter
43     private final String source;
44
45     @Getter
46     private final String target;
47
48     private final List<TopicSink> publishers;
49     private final List<TopicSource> subscribers;
50
51     /**
52      * Constructs the object.
53      *
54      * @param source source topic name
55      * @param target target topic name
56      */
57     public TopicPair(String source, String target) {
58         this.source = source;
59         this.target = target;
60
61         publishers = getTopicEndpointManager().getTopicSinks(target);
62         if (publishers.isEmpty()) {
63             throw new IllegalArgumentException("no sinks for topic: " + target);
64         }
65
66         subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source));
67         if (subscribers.isEmpty()) {
68             throw new IllegalArgumentException("no sources for topic: " + source);
69         }
70     }
71
72     /**
73      * Starts listening on the source topic(s).
74      */
75     public void start() {
76         subscribers.forEach(topic -> topic.register(this));
77     }
78
79     /**
80      * Stops listening on the source topic(s).
81      */
82     public void stop() {
83         subscribers.forEach(topic -> topic.unregister(this));
84     }
85
86     /**
87      * Stops listening on the source topic(s) and clears all of the forwarders.
88      */
89     @Override
90     public void shutdown() {
91         stop();
92         super.shutdown();
93     }
94
95     /**
96      * Publishes a message to the target topic.
97      *
98      * @param message message to be published
99      * @return a list of the infrastructures on which it was published
100      */
101     public List<CommInfrastructure> publish(String message) {
102         List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size());
103
104         for (TopicSink topic : publishers) {
105             try {
106                 topic.send(message);
107                 infrastructures.add(topic.getTopicCommInfrastructure());
108
109             } catch (RuntimeException e) {
110                 logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e);
111             }
112         }
113
114         return infrastructures;
115     }
116
117     // these may be overridden by junit tests
118
119     protected TopicEndpoint getTopicEndpointManager() {
120         return TopicEndpointManager.getManager();
121     }
122 }