eb805ca5d2d26fbd331ab0752dab43bc6da90ada
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / topic / TopicListenerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.topic;
22
23 import java.util.Arrays;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.utils.coder.CoderException;
30 import org.onap.policy.common.utils.coder.StandardCoder;
31 import org.onap.policy.common.utils.coder.StandardCoderObject;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * A topic listener. When a message arrives on a topic, it is forwarded to listeners based
37  * on the content of fields found within the message. However, depending on the message
38  * type, the relevant fields might be found in different places within the message's
39  * object hierarchy. For each different list of keys, this class maintains a
40  * {@link Forwarder}, which is used to forward the message to all relevant listeners.
41  * <p/>
42  * Once a selector has been added, it is not removed until {@link #shutdown()} is invoked.
43  * As selectors are typically only added by Operators, and not by individual Operations,
44  * this should not pose a problem.
45  */
46 public class TopicListenerImpl implements TopicListener {
47     private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class);
48     private static StandardCoder coder = new StandardCoder();
49
50     /**
51      * Maps selector to a forwarder.
52      */
53     private final Map<List<SelectorKey>, Forwarder> selector2forwarder = new ConcurrentHashMap<>();
54
55
56     /**
57      * Removes all forwarders.
58      */
59     public void shutdown() {
60         selector2forwarder.clear();
61     }
62
63     /**
64      * Adds a forwarder, if it doesn't already exist.
65      *
66      * @param keys the selector keys
67      * @return the forwarder associated with the given selector keys
68      */
69     public Forwarder addForwarder(SelectorKey... keys) {
70         return addForwarder(Arrays.asList(keys));
71     }
72
73     /**
74      * Adds a forwarder, if it doesn't already exist.
75      *
76      * @param keys the selector keys
77      * @return the forwarder associated with the given selector keys
78      */
79     public Forwarder addForwarder(List<SelectorKey> keys) {
80         return selector2forwarder.computeIfAbsent(keys, key -> new Forwarder(keys));
81     }
82
83     /**
84      * Decodes the message and then forwards it to each forwarder for processing.
85      */
86     @Override
87     public void onTopicEvent(CommInfrastructure infra, String topic, String message) {
88         StandardCoderObject object;
89         try {
90             object = coder.decode(message, StandardCoderObject.class);
91         } catch (CoderException e) {
92             logger.warn("cannot decode message", e);
93             return;
94         }
95
96         /*
97          * We don't know which selector is appropriate for the message, so we just let
98          * them all take a crack at it.
99          */
100         for (Forwarder forwarder : selector2forwarder.values()) {
101             forwarder.onMessage(infra, message, object);
102         }
103     }
104 }