Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-messages / src / main / java / org / onap / policy / drools / pooling / state / QueryState.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import java.util.TreeSet;
25 import org.onap.policy.drools.pooling.PoolingManager;
26 import org.onap.policy.drools.pooling.message.BucketAssignments;
27 import org.onap.policy.drools.pooling.message.Identification;
28 import org.onap.policy.drools.pooling.message.Leader;
29 import org.onap.policy.drools.pooling.message.Offline;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * The Query state. In this state, the host waits for the other hosts to identify
35  * themselves. Eventually, a leader should come forth. If not, it will transition to the
36  * active or inactive state, depending on whether it has an assignment in the
37  * current bucket assignments. The other possibility is that it may <i>become</i> the
38  * leader, in which case it will also transition to the active state.
39  */
40 public class QueryState extends ProcessingState {
41
42     private static final Logger logger = LoggerFactory.getLogger(QueryState.class);
43
44     /**
45      * Hosts that have sent an "Identification" message. Always includes this host.
46      */
47     private final TreeSet<String> alive = new TreeSet<>();
48
49     /**
50      * {@code True} if we saw our own Identification method, {@code false} otherwise.
51      */
52     private boolean sawSelfIdent = false;
53
54     /**
55      * Constructor.
56      * 
57      * @param mgr manager
58      */
59     public QueryState(PoolingManager mgr) {
60         // this host is the leader, until a better candidate identifies itself
61         super(mgr, mgr.getHost());
62
63         alive.add(getHost());
64     }
65
66     @Override
67     public void start() {
68         super.start();
69
70         // start identification timer
71         awaitIdentification();
72     }
73
74     /**
75      * Starts a timer to wait for all Identification messages to arrive.
76      */
77     private void awaitIdentification() {
78
79         /*
80          * Once we've waited long enough for all Identification messages to arrive, become
81          * the leader, assuming we should.
82          */
83
84         schedule(getProperties().getIdentificationMs(), () -> {
85
86             if (!sawSelfIdent) {
87                 // didn't see our identification
88                 logger.error("missed our own Ident message on topic {}", getTopic());
89                 return missedHeartbeat();
90
91             } else if (isLeader()) {
92                 // "this" host is the new leader
93                 logger.info("this host is the new leader for topic {}", getTopic());
94                 return becomeLeader(alive);
95
96             } else {
97                 // not the leader - return to previous state
98                 logger.info("no new leader on topic {}", getTopic());
99                 return goActive(getAssignments());
100             }
101         });
102     }
103
104     @Override
105     public State goQuery() {
106         return null;
107     }
108
109     @Override
110     public State process(Identification msg) {
111
112         if (getHost().equals(msg.getSource())) {
113             logger.info("saw our own Ident message on topic {}", getTopic());
114             sawSelfIdent = true;
115
116         } else {
117             logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic());
118             recordInfo(msg.getSource(), msg.getAssignments());
119         }
120
121         return null;
122     }
123
124     /**
125      * If the message leader is better than the leader we have, then go active with it.
126      * Otherwise, simply treat it like an {@link Identification} message.
127      */
128     @Override
129     public State process(Leader msg) {
130         if (!isValid(msg)) {
131             return null;
132         }
133
134         String source = msg.getSource();
135         BucketAssignments asgn = msg.getAssignments();
136
137         // go active, if this has a leader that's the same or better than the one we have
138         if (source.compareTo(getLeader()) <= 0) {
139             logger.warn("leader with {} on topic {}", source, getTopic());
140             return goActive(asgn);
141         }
142
143         /*
144          * The message does not have an acceptable leader, but we'll still record its
145          * info.
146          */
147         logger.info("record leader info from {} on topic {}", source, getTopic());
148         recordInfo(source, asgn);
149
150         return null;
151     }
152
153     @Override
154     public State process(Offline msg) {
155         String host = msg.getSource();
156
157         if (host != null && !host.equals(getHost())) {
158             logger.warn("host {} offline on topic {}", host, getTopic());
159             alive.remove(host);
160             setLeader(alive.first());
161
162         } else {
163             logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
164         }
165
166         return null;
167     }
168
169     /**
170      * Records info from a message, adding the source host name to {@link #alive}, and
171      * updating the bucket assignments.
172      * 
173      * @param source the message's source host
174      * @param assignments assignments, or {@code null}
175      */
176     private void recordInfo(String source, BucketAssignments assignments) {
177         // add this message's source host to "alive"
178         if (source != null) {
179             alive.add(source);
180             setLeader(alive.first());
181         }
182
183         if (assignments == null || assignments.getLeader() == null) {
184             return;
185         }
186
187         // record assignments, if we don't have any yet
188         BucketAssignments current = getAssignments();
189         if (current == null) {
190             logger.info("received initial assignments on topic {}", getTopic());
191             setAssignments(assignments);
192             return;
193         }
194
195         /*
196          * Record assignments, if the new assignments have a better (i.e., lesser) leader.
197          */
198         String curldr = current.getLeader();
199         if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) {
200             logger.info("use new assignments from {} on topic {}", source, getTopic());
201             setAssignments(assignments);
202         }
203     }
204 }