Set all cross references of policy/models
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicActor.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.function.Function;
26 import org.apache.commons.lang3.tuple.Pair;
27 import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
28 import org.onap.policy.controlloop.actorserviceprovider.Util;
29 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams;
30 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
31 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
32 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
33
34 /**
35  * Actor that uses a bidirectional topic. The actor's operator parameters are expected to
36  * be an {@link BidirectionalTopicParams}.
37  *
38  * @param <P> type of parameters
39  */
40 public class BidirectionalTopicActor<P extends BidirectionalTopicActorParams> extends ActorImpl
41                 implements BidirectionalTopicManager {
42
43     /**
44      * Class of parameters.
45      */
46     private final Class<P> paramsClass;
47
48     /**
49      * Maps a pair of sink and source topic names to their bidirectional topic.
50      */
51     private final Map<Pair<String, String>, BidirectionalTopicHandler> params2topic = new ConcurrentHashMap<>();
52
53
54     /**
55      * Constructs the object.
56      *
57      * @param name actor's name
58      */
59     public BidirectionalTopicActor(String name, Class<P> paramsClass) {
60         super(name);
61         this.paramsClass = paramsClass;
62     }
63
64     @Override
65     protected void doStart() {
66         params2topic.values().forEach(BidirectionalTopicHandler::start);
67         super.doStart();
68     }
69
70     @Override
71     protected void doStop() {
72         params2topic.values().forEach(BidirectionalTopicHandler::stop);
73         super.doStop();
74     }
75
76     @Override
77     protected void doShutdown() {
78         params2topic.values().forEach(BidirectionalTopicHandler::shutdown);
79         params2topic.clear();
80         super.doShutdown();
81     }
82
83     @Override
84     public BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic) {
85         Pair<String, String> key = Pair.of(sinkTopic, sourceTopic);
86
87         return params2topic.computeIfAbsent(key, pair -> {
88             try {
89                 return makeTopicHandler(sinkTopic, sourceTopic);
90             } catch (BidirectionalTopicClientException e) {
91                 throw new IllegalArgumentException(e);
92             }
93         });
94     }
95
96     /**
97      * Translates the parameters to a {@link BidirectionalTopicActorParams} and then
98      * creates a function that will extract operator-specific parameters.
99      */
100     @Override
101     protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
102         String actorName = getName();
103
104         // @formatter:off
105         return Util.translate(actorName, actorParameters, paramsClass)
106                         .doValidation(actorName)
107                         .makeOperationParameters(actorName);
108         // @formatter:on
109     }
110
111     // may be overridden by junit tests
112
113     protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic)
114                     throws BidirectionalTopicClientException {
115
116         return new BidirectionalTopicHandler(sinkTopic, sourceTopic);
117     }
118 }