2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.topic;
23 import java.util.Arrays;
24 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.utils.coder.CoderException;
30 import org.onap.policy.common.utils.coder.StandardCoder;
31 import org.onap.policy.common.utils.coder.StandardCoderObject;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * A topic listener. When a message arrives on a topic, it is forwarded to listeners based
37 * on the content of fields found within the message. However, depending on the message
38 * type, the relevant fields might be found in different places within the message's
39 * object hierarchy. For each different list of keys, this class maintains a
40 * {@link Forwarder}, which is used to forward the message to all relevant listeners.
42 * Once a selector has been added, it is not removed until {@link #shutdown()} is invoked.
43 * As selectors are typically only added by Operators, and not by individual Operations,
44 * this should not pose a problem.
46 public class TopicListenerImpl implements TopicListener {
47 private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class);
48 private static StandardCoder coder = new StandardCoder();
51 * Maps selector to a forwarder.
53 private final Map<List<SelectorKey>, Forwarder> selector2forwarder = new ConcurrentHashMap<>();
57 * Removes all forwarders.
59 public void shutdown() {
60 selector2forwarder.clear();
64 * Adds a forwarder, if it doesn't already exist.
66 * @param keys the selector keys
67 * @return the forwarder associated with the given selector keys
69 public Forwarder addForwarder(SelectorKey... keys) {
70 return addForwarder(Arrays.asList(keys));
74 * Adds a forwarder, if it doesn't already exist.
76 * @param keys the selector keys
77 * @return the forwarder associated with the given selector keys
79 public Forwarder addForwarder(List<SelectorKey> keys) {
80 return selector2forwarder.computeIfAbsent(keys, key -> new Forwarder(keys));
84 * Decodes the message and then forwards it to each forwarder for processing.
87 public void onTopicEvent(CommInfrastructure infra, String topic, String message) {
88 StandardCoderObject object;
90 object = coder.decode(message, StandardCoderObject.class);
91 } catch (CoderException e) {
92 logger.warn("cannot decode message", e);
97 * We don't know which selector is appropriate for the message, so we just let
98 * them all take a crack at it.
100 for (Forwarder forwarder : selector2forwarder.values()) {
101 forwarder.onMessage(infra, message, object);