Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / state / ActiveState.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling.state;
22
23 import java.util.Arrays;
24 import java.util.TreeSet;
25 import lombok.AccessLevel;
26 import lombok.Getter;
27 import org.onap.policy.drools.pooling.PoolingManager;
28 import org.onap.policy.drools.pooling.message.Heartbeat;
29 import org.onap.policy.drools.pooling.message.Leader;
30 import org.onap.policy.drools.pooling.message.Offline;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * The active state. In this state, this host has one more more bucket assignments and
36  * processes any events associated with one of its buckets. Other events are forwarded to
37  * appropriate target hosts.
38  */
39 @Getter(AccessLevel.PROTECTED)
40 public class ActiveState extends ProcessingState {
41
42     private static final Logger logger = LoggerFactory.getLogger(ActiveState.class);
43
44     /**
45      * Set of hosts that have been assigned a bucket.
46      */
47     @Getter(AccessLevel.NONE)
48     private final TreeSet<String> assigned = new TreeSet<>();
49
50     /**
51      * Host that comes after this host, or {@code null} if it has no successor.
52      */
53     private String succHost = null;
54
55     /**
56      * Host that comes before this host, or "" if it has no predecessor.
57      */
58     private String predHost = "";
59
60     /**
61      * {@code True} if we saw this host's heart beat since the last check, {@code false}
62      * otherwise.
63      */
64     private boolean myHeartbeatSeen = false;
65
66     /**
67      * {@code True} if we saw the predecessor's heart beat since the last check,
68      * {@code false} otherwise.
69      */
70     private boolean predHeartbeatSeen = false;
71
72
73     /**
74      * Constructor.
75      *
76      * @param mgr pooling manager
77      */
78     public ActiveState(PoolingManager mgr) {
79         super(mgr, mgr.getAssignments().getLeader());
80
81         assigned.addAll(Arrays.asList(mgr.getAssignments().getHostArray()));
82
83         detmNeighbors();
84     }
85
86     /**
87      * Determine this host's neighbors based on the order of the host UUIDs. Updates
88      * {@link #succHost} and {@link #predHost}.
89      */
90     private void detmNeighbors() {
91         if (assigned.size() < 2) {
92             logger.info("this host has no neighbors on topic {}", getTopic());
93             /*
94              * this host is the only one with any assignments - it has no neighbors
95              */
96             succHost = null;
97             predHost = "";
98             return;
99         }
100
101         if ((succHost = assigned.higher(getHost())) == null) {
102             // wrapped around - successor is the first host in the set
103             succHost = assigned.first();
104         }
105         logger.info("this host's successor is {} on topic {}", succHost, getTopic());
106
107         if ((predHost = assigned.lower(getHost())) == null) {
108             // wrapped around - predecessor is the last host in the set
109             predHost = assigned.last();
110         }
111         logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
112     }
113
114     @Override
115     public void start() {
116         super.start();
117         addTimers();
118         genHeartbeat();
119     }
120
121     /**
122      * Adds the timers.
123      */
124     private void addTimers() {
125         logger.info("add timers");
126
127         /*
128          * heart beat generator
129          */
130         long genMs = getProperties().getInterHeartbeatMs();
131
132         scheduleWithFixedDelay(genMs, genMs, () -> {
133             genHeartbeat();
134             return null;
135         });
136
137         /*
138          * my heart beat checker
139          */
140         long waitMs = getProperties().getActiveHeartbeatMs();
141
142         scheduleWithFixedDelay(waitMs, waitMs, () -> {
143             if (myHeartbeatSeen) {
144                 myHeartbeatSeen = false;
145                 return null;
146             }
147
148             // missed my heart beat
149             logger.error("missed my heartbeat on topic {}", getTopic());
150
151             return missedHeartbeat();
152         });
153
154         /*
155          * predecessor heart beat checker
156          */
157         if (!predHost.isEmpty()) {
158
159             scheduleWithFixedDelay(waitMs, waitMs, () -> {
160                 if (predHeartbeatSeen) {
161                     predHeartbeatSeen = false;
162                     return null;
163                 }
164
165                 // missed the predecessor's heart beat
166                 logger.warn("missed predecessor's heartbeat on topic {}", getTopic());
167
168                 publish(makeQuery());
169
170                 return goQuery();
171             });
172         }
173     }
174
175     /**
176      * Generates a heart beat for this host and its successor.
177      */
178     private void genHeartbeat() {
179         var msg = makeHeartbeat(System.currentTimeMillis());
180         publish(getHost(), msg);
181
182         if (succHost != null) {
183             publish(succHost, msg);
184         }
185     }
186
187     @Override
188     public State process(Heartbeat msg) {
189         String src = msg.getSource();
190
191         if (src == null) {
192             logger.warn("Heartbeat message has no source on topic {}", getTopic());
193
194         } else if (src.equals(getHost())) {
195             logger.info("saw my heartbeat on topic {}", getTopic());
196             myHeartbeatSeen = true;
197
198         } else if (src.equals(predHost)) {
199             logger.info("saw heartbeat from {} on topic {}", src, getTopic());
200             predHeartbeatSeen = true;
201
202         } else {
203             logger.info("ignored heartbeat message from {} on topic {}", src, getTopic());
204         }
205
206         return null;
207     }
208
209     @Override
210     public State process(Leader msg) {
211         if (!isValid(msg)) {
212             return null;
213         }
214
215         String src = msg.getSource();
216
217         if (getHost().compareTo(src) < 0) {
218             // our host would be a better leader - find out what's up
219             logger.warn("unexpected Leader message from {} on topic {}", src, getTopic());
220             return goQuery();
221         }
222
223         logger.info("have a new leader {} on topic {}", src, getTopic());
224
225         return goActive(msg.getAssignments());
226     }
227
228     @Override
229     public State process(Offline msg) {
230         String src = msg.getSource();
231
232         if (src == null) {
233             logger.warn("Offline message has no source on topic {}", getTopic());
234             return null;
235
236         } else if (!assigned.contains(src)) {
237             /*
238              * the offline host wasn't assigned any buckets, so just ignore the message
239              */
240             logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic());
241             return null;
242
243         } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
244             /*
245              * Case 1: We are the leader.
246              *
247              * Case 2: Our predecessor was the leader and it has gone offline - we should
248              * become the leader.
249              *
250              * In either case, we are now the leader and we must re-balance the buckets
251              * since one of the hosts has gone offline.
252              */
253
254             logger.info("Offline message from source {} on topic {}", src, getTopic());
255
256             assigned.remove(src);
257
258             return becomeLeader(assigned);
259
260         } else {
261             /*
262              * Otherwise, we don't care right now - we'll wait for the leader to tell us
263              * it's been removed.
264              */
265             logger.info("ignore Offline message from source {} on topic {}", src, getTopic());
266             return null;
267         }
268     }
269 }