Use BidirectionalTopicClient from policy-common
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicActor.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.function.Function;
26 import org.apache.commons.lang3.tuple.Pair;
27 import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
28 import org.onap.policy.controlloop.actorserviceprovider.Util;
29 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams;
30 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
31 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
32
33 /**
34  * Actor that uses a bidirectional topic. The actor's parameters must be a
35  * {@link BidirectionalTopicActorParams}.
36  */
37 public class BidirectionalTopicActor extends ActorImpl implements BidirectionalTopicManager {
38
39     /**
40      * Maps a pair of sink and source topic names to their bidirectional topic.
41      */
42     private final Map<Pair<String, String>, BidirectionalTopicHandler> params2topic = new ConcurrentHashMap<>();
43
44
45     /**
46      * Constructs the object.
47      *
48      * @param name actor's name
49      */
50     public BidirectionalTopicActor(String name) {
51         super(name);
52     }
53
54     @Override
55     protected void doStart() {
56         params2topic.values().forEach(BidirectionalTopicHandler::start);
57         super.doStart();
58     }
59
60     @Override
61     protected void doStop() {
62         params2topic.values().forEach(BidirectionalTopicHandler::stop);
63         super.doStop();
64     }
65
66     @Override
67     protected void doShutdown() {
68         params2topic.values().forEach(BidirectionalTopicHandler::shutdown);
69         params2topic.clear();
70         super.doShutdown();
71     }
72
73     @Override
74     public BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic) {
75         Pair<String, String> key = Pair.of(sinkTopic, sourceTopic);
76
77         return params2topic.computeIfAbsent(key, pair -> {
78             try {
79                 return makeTopicHandler(sinkTopic, sourceTopic);
80             } catch (BidirectionalTopicClientException e) {
81                 throw new IllegalArgumentException(e);
82             }
83         });
84     }
85
86     /**
87      * Translates the parameters to a {@link BidirectionalTopicActorParams} and then
88      * creates a function that will extract operator-specific parameters.
89      */
90     @Override
91     protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
92         String actorName = getName();
93
94         // @formatter:off
95         return Util.translate(actorName, actorParameters, BidirectionalTopicActorParams.class)
96                         .doValidation(actorName)
97                         .makeOperationParameters(actorName);
98         // @formatter:on
99     }
100
101     // may be overridden by junit tests
102
103     protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic)
104                     throws BidirectionalTopicClientException {
105
106         return new BidirectionalTopicHandler(sinkTopic, sourceTopic);
107     }
108 }