41f9abdb41422df90c49bcdfcea746838d610e18
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.listeners;
22
23 import java.util.concurrent.ConcurrentHashMap;
24 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
25 import org.onap.policy.common.utils.coder.StandardCoderObject;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  * Dispatches standard objects to listeners, based on the message type extracted from the
31  * message. Only one listener may be registered for a given type.
32  */
33 public class MessageTypeDispatcher extends JsonListener {
34     private static final Logger logger = LoggerFactory.getLogger(MessageTypeDispatcher.class);
35
36     /**
37      * Name of the message field, which may be hierarchical.
38      */
39     private final Object[] messageFieldNames;
40
41     /**
42      * Name of the message field, joined with "." - for logging.
43      */
44     private final String fullMessageFieldName;
45
46     /**
47      * Maps a message type to its listener.
48      */
49     private final ConcurrentHashMap<String, ScoListener<?>> type2listener = new ConcurrentHashMap<>();
50
51     /**
52      * Constructs the object.
53      *
54      * @param messageFieldNames name of the message field, which may be hierarchical
55      */
56     public MessageTypeDispatcher(String... messageFieldNames) {
57         this.messageFieldNames = messageFieldNames;
58         this.fullMessageFieldName = String.join(".", messageFieldNames);
59     }
60
61     /**
62      * Registers a listener for a certain type of message.
63      *
64      * @param type type of message of interest to the listener
65      * @param listener listener to register
66      */
67     public <T> void register(String type, ScoListener<T> listener) {
68         type2listener.put(type, listener);
69     }
70
71     /**
72      * Unregisters the listener associated with the specified message type.
73      *
74      * @param type type of message whose listener is to be unregistered
75      */
76     public void unregister(String type) {
77         type2listener.remove(type);
78     }
79
80     @Override
81     public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco) {
82         // extract the message type
83         final var type = sco.getString(messageFieldNames);
84         if (type == null) {
85             logger.warn("unable to extract {}: {}", fullMessageFieldName, sco);
86             return;
87         }
88
89         // dispatch the message
90         ScoListener<?> listener = type2listener.get(type);
91         if (listener == null) {
92             logger.info("discarding event of type {}", type);
93             return;
94         }
95
96         listener.onTopicEvent(infra, topic, sco);
97     }
98 }