Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / main / java / org / onap / policy / drools / pooling / state / ActiveState.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import java.util.Arrays;
25 import java.util.TreeSet;
26 import lombok.AccessLevel;
27 import lombok.Getter;
28 import org.onap.policy.drools.pooling.PoolingManager;
29 import org.onap.policy.drools.pooling.message.Heartbeat;
30 import org.onap.policy.drools.pooling.message.Leader;
31 import org.onap.policy.drools.pooling.message.Offline;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * The active state. In this state, this host has one more bucket assignments and
37  * processes any events associated with one of its buckets. Other events are forwarded to
38  * appropriate target hosts.
39  */
40 @Getter(AccessLevel.PROTECTED)
41 public class ActiveState extends ProcessingState {
42
43     private static final Logger logger = LoggerFactory.getLogger(ActiveState.class);
44
45     /**
46      * Set of hosts that have been assigned a bucket.
47      */
48     @Getter(AccessLevel.NONE)
49     private final TreeSet<String> assigned = new TreeSet<>();
50
51     /**
52      * Host that comes after this host, or {@code null} if it has no successor.
53      */
54     private String succHost = null;
55
56     /**
57      * Host that comes before this host, or "" if it has no predecessor.
58      */
59     private String predHost = "";
60
61     /**
62      * {@code True} if we saw this host's heart beat since the last check, {@code false}
63      * otherwise.
64      */
65     private boolean myHeartbeatSeen = false;
66
67     /**
68      * {@code True} if we saw the predecessor's heart beat since the last check,
69      * {@code false} otherwise.
70      */
71     private boolean predHeartbeatSeen = false;
72
73
74     /**
75      * Constructor.
76      *
77      * @param mgr pooling manager
78      */
79     public ActiveState(PoolingManager mgr) {
80         super(mgr, mgr.getAssignments().getLeader());
81
82         assigned.addAll(Arrays.asList(mgr.getAssignments().getHostArray()));
83
84         detmNeighbors();
85     }
86
87     /**
88      * Determine this host's neighbors based on the order of the host UUIDs. Updates
89      * {@link #succHost} and {@link #predHost}.
90      */
91     private void detmNeighbors() {
92         if (assigned.size() < 2) {
93             logger.info("this host has no neighbors on topic {}", getTopic());
94             /*
95              * this host is the only one with any assignments - it has no neighbors
96              */
97             succHost = null;
98             predHost = "";
99             return;
100         }
101
102         if ((succHost = assigned.higher(getHost())) == null) {
103             // wrapped around - successor is the first host in the set
104             succHost = assigned.first();
105         }
106         logger.info("this host's successor is {} on topic {}", succHost, getTopic());
107
108         if ((predHost = assigned.lower(getHost())) == null) {
109             // wrapped around - predecessor is the last host in the set
110             predHost = assigned.last();
111         }
112         logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
113     }
114
115     @Override
116     public void start() {
117         super.start();
118         addTimers();
119         genHeartbeat();
120     }
121
122     /**
123      * Adds the timers.
124      */
125     private void addTimers() {
126         logger.info("add timers");
127
128         /*
129          * heart beat generator
130          */
131         long genMs = getProperties().getInterHeartbeatMs();
132
133         scheduleWithFixedDelay(genMs, genMs, () -> {
134             genHeartbeat();
135             return null;
136         });
137
138         /*
139          * my heart beat checker
140          */
141         long waitMs = getProperties().getActiveHeartbeatMs();
142
143         scheduleWithFixedDelay(waitMs, waitMs, () -> {
144             if (myHeartbeatSeen) {
145                 myHeartbeatSeen = false;
146                 return null;
147             }
148
149             // missed my heart beat
150             logger.error("missed my heartbeat on topic {}", getTopic());
151
152             return missedHeartbeat();
153         });
154
155         /*
156          * predecessor heart beat checker
157          */
158         if (!predHost.isEmpty()) {
159
160             scheduleWithFixedDelay(waitMs, waitMs, () -> {
161                 if (predHeartbeatSeen) {
162                     predHeartbeatSeen = false;
163                     return null;
164                 }
165
166                 // missed the predecessor's heart beat
167                 logger.warn("missed predecessor's heartbeat on topic {}", getTopic());
168
169                 publish(makeQuery());
170
171                 return goQuery();
172             });
173         }
174     }
175
176     /**
177      * Generates a heart beat for this host and its successor.
178      */
179     private void genHeartbeat() {
180         var msg = makeHeartbeat(System.currentTimeMillis());
181         publish(getHost(), msg);
182
183         if (succHost != null) {
184             publish(succHost, msg);
185         }
186     }
187
188     @Override
189     public State process(Heartbeat msg) {
190         String src = msg.getSource();
191
192         if (src == null) {
193             logger.warn("Heartbeat message has no source on topic {}", getTopic());
194
195         } else if (src.equals(getHost())) {
196             logger.info("saw my heartbeat on topic {}", getTopic());
197             myHeartbeatSeen = true;
198
199         } else if (src.equals(predHost)) {
200             logger.info("saw heartbeat from {} on topic {}", src, getTopic());
201             predHeartbeatSeen = true;
202
203         } else {
204             logger.info("ignored heartbeat message from {} on topic {}", src, getTopic());
205         }
206
207         return null;
208     }
209
210     @Override
211     public State process(Leader msg) {
212         if (!isValid(msg)) {
213             return null;
214         }
215
216         String src = msg.getSource();
217
218         if (getHost().compareTo(src) < 0) {
219             // our host would be a better leader - find out what's up
220             logger.warn("unexpected Leader message from {} on topic {}", src, getTopic());
221             return goQuery();
222         }
223
224         logger.info("have a new leader {} on topic {}", src, getTopic());
225
226         return goActive(msg.getAssignments());
227     }
228
229     @Override
230     public State process(Offline msg) {
231         String src = msg.getSource();
232
233         if (src == null) {
234             logger.warn("Offline message has no source on topic {}", getTopic());
235             return null;
236
237         } else if (!assigned.contains(src)) {
238             /*
239              * the offline host wasn't assigned any buckets, so just ignore the message
240              */
241             logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic());
242             return null;
243
244         } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
245             /*
246              * Case 1: We are the leader.
247              *
248              * Case 2: Our predecessor was the leader, and it has gone offline - we should
249              * become the leader.
250              *
251              * In either case, we are now the leader, and we must re-balance the buckets
252              * since one of the hosts has gone offline.
253              */
254
255             logger.info("Offline message from source {} on topic {}", src, getTopic());
256
257             assigned.remove(src);
258
259             return becomeLeader(assigned);
260
261         } else {
262             /*
263              * Otherwise, we don't care right now - we'll wait for the leader to tell us
264              * it's been removed.
265              */
266             logger.info("ignore Offline message from source {} on topic {}", src, getTopic());
267             return null;
268         }
269     }
270 }