Make feature-pooling-dmaap work without filtering
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / state / State.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling.state;
22
23 import java.util.LinkedList;
24 import java.util.List;
25 import org.onap.policy.drools.pooling.CancellableScheduledTask;
26 import org.onap.policy.drools.pooling.PoolingManager;
27 import org.onap.policy.drools.pooling.PoolingProperties;
28 import org.onap.policy.drools.pooling.message.BucketAssignments;
29 import org.onap.policy.drools.pooling.message.Heartbeat;
30 import org.onap.policy.drools.pooling.message.Identification;
31 import org.onap.policy.drools.pooling.message.Leader;
32 import org.onap.policy.drools.pooling.message.Offline;
33 import org.onap.policy.drools.pooling.message.Query;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * A state in the finite state machine.
39  *
40  * <p>A state may have several timers associated with it, which must be cancelled whenever
41  * the state is changed. Assumes that timers are not continuously added to the same state.
42  */
43 public abstract class State {
44
45     private static final Logger logger = LoggerFactory.getLogger(State.class);
46
47     /**
48      * Host pool manager.
49      */
50     private final PoolingManager mgr;
51
52     /**
53      * Timers added by this state.
54      */
55     private final List<CancellableScheduledTask> timers = new LinkedList<>();
56
57     /**
58      * Constructor.
59      *
60      * @param mgr pooling manager
61      */
62     public State(PoolingManager mgr) {
63         this.mgr = mgr;
64     }
65
66     /**
67      * Cancels the timers added by this state.
68      */
69     public final void cancelTimers() {
70         timers.forEach(CancellableScheduledTask::cancel);
71     }
72
73     /**
74      * Starts the state. The default method simply logs a message and returns.
75      */
76     public void start() {
77         logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
78     }
79
80     /**
81      * Transitions to the "start" state.
82      *
83      * @return the new state
84      */
85     public final State goStart() {
86         return mgr.goStart();
87     }
88
89     /**
90      * Transitions to the "query" state.
91      *
92      * @return the new state
93      */
94     public State goQuery() {
95         return mgr.goQuery();
96     }
97
98     /**
99      * Goes active with a new set of assignments.
100      *
101      * @param asgn new assignments
102      * @return the new state, either Active or Inactive, depending on whether or not this
103      *         host has an assignment
104      */
105     protected State goActive(BucketAssignments asgn) {
106         startDistributing(asgn);
107
108         if (asgn != null && asgn.hasAssignment(getHost())) {
109             return mgr.goActive();
110
111         } else {
112             return goInactive();
113         }
114     }
115
116     /**
117      * Transitions to the "inactive" state.
118      *
119      * @return the new state
120      */
121     protected State goInactive() {
122         return mgr.goInactive();
123     }
124
125     /**
126      * Processes a message. The default method just returns {@code null}.
127      *
128      * @param msg message to be processed
129      * @return the new state, or {@code null} if the state is unchanged
130      */
131     public State process(Heartbeat msg) {
132         logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
133         return null;
134     }
135
136     /**
137      * Processes a message. The default method just returns {@code null}.
138      *
139      * @param msg message to be processed
140      * @return the new state, or {@code null} if the state is unchanged
141      */
142     public State process(Identification msg) {
143         logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
144         return null;
145     }
146
147     /**
148      * Processes a message. The default method copies the assignments and then returns
149      * {@code null}.
150      *
151      * @param msg message to be processed
152      * @return the new state, or {@code null} if the state is unchanged
153      */
154     public State process(Leader msg) {
155         if (isValid(msg)) {
156             logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
157             startDistributing(msg.getAssignments());
158         }
159
160         return null;
161     }
162
163     /**
164      * Processes a message. The default method just returns {@code null}.
165      *
166      * @param msg message to be processed
167      * @return the new state, or {@code null} if the state is unchanged
168      */
169     public State process(Offline msg) {
170         logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
171         return null;
172     }
173
174     /**
175      * Processes a message. The default method just returns {@code null}.
176      *
177      * @param msg message to be processed
178      * @return the new state, or {@code null} if the state is unchanged
179      */
180     public State process(Query msg) {
181         logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
182         return null;
183     }
184
185     /**
186      * Determines if a message is valid and did not originate from this host.
187      *
188      * @param msg message to be validated
189      * @return {@code true} if the message is valid, {@code false} otherwise
190      */
191     protected boolean isValid(Leader msg) {
192         BucketAssignments asgn = msg.getAssignments();
193         if (asgn == null) {
194             logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
195             return false;
196         }
197
198         // ignore Leader messages from ourself
199         String source = msg.getSource();
200         if (source == null || source.equals(getHost())) {
201             logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
202             return false;
203         }
204
205         // the new leader must equal the source
206         boolean result = source.equals(asgn.getLeader());
207
208         if (!result) {
209             logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
210         }
211
212         return result;
213     }
214
215     /**
216      * Publishes a message.
217      *
218      * @param msg message to be published
219      */
220     protected final void publish(Identification msg) {
221         mgr.publishAdmin(msg);
222     }
223
224     /**
225      * Publishes a message.
226      *
227      * @param msg message to be published
228      */
229     protected final void publish(Leader msg) {
230         mgr.publishAdmin(msg);
231     }
232
233     /**
234      * Publishes a message.
235      *
236      * @param msg message to be published
237      */
238     protected final void publish(Offline msg) {
239         mgr.publishAdmin(msg);
240     }
241
242     /**
243      * Publishes a message.
244      *
245      * @param msg message to be published
246      */
247     protected final void publish(Query msg) {
248         mgr.publishAdmin(msg);
249     }
250
251     /**
252      * Publishes a message on the specified channel.
253      *
254      * @param channel channel
255      * @param msg message to be published
256      */
257     protected final void publish(String channel, Heartbeat msg) {
258         mgr.publish(channel, msg);
259     }
260
261     /**
262      * Starts distributing messages using the specified bucket assignments.
263      *
264      * @param assignments assignments
265      */
266     protected final void startDistributing(BucketAssignments assignments) {
267         if (assignments != null) {
268             mgr.startDistributing(assignments);
269         }
270     }
271
272     /**
273      * Schedules a timer to fire after a delay.
274      *
275      * @param delayMs delay in ms
276      * @param task task
277      */
278     protected final void schedule(long delayMs, StateTimerTask task) {
279         timers.add(mgr.schedule(delayMs, task));
280     }
281
282     /**
283      * Schedules a timer to fire repeatedly.
284      *
285      * @param initialDelayMs initial delay ms
286      * @param delayMs delay ms
287      * @param task task
288      */
289     protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
290         timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
291     }
292
293     /**
294      * Indicates that we failed to see our own heartbeat; must be a problem with the
295      * internal topic. Assumes the problem is temporary and continues to use the current
296      * bucket assignments.
297      *
298      * @return a new {@link StartState}
299      */
300     protected final State missedHeartbeat() {
301         publish(makeOffline());
302
303         return mgr.goStart();
304     }
305
306     /**
307      * Indicates that the internal topic failed; this should only be invoked from the
308      * StartState. Discards bucket assignments and begins processing everything locally.
309      *
310      * @return a new {@link InactiveState}
311      */
312     protected final State internalTopicFailed() {
313         publish(makeOffline());
314         mgr.startDistributing(null);
315
316         return mgr.goInactive();
317     }
318
319     /**
320      * Makes a heart beat message.
321      *
322      * @param timestampMs time, in milliseconds, associated with the message
323      *
324      * @return a new message
325      */
326     protected final Heartbeat makeHeartbeat(long timestampMs) {
327         return new Heartbeat(getHost(), timestampMs);
328     }
329
330     /**
331      * Makes an Identification message.
332      *
333      * @return a new message
334      */
335     protected Identification makeIdentification() {
336         return new Identification(getHost(), getAssignments());
337     }
338
339     /**
340      * Makes an "offline" message.
341      *
342      * @return a new message
343      */
344     protected final Offline makeOffline() {
345         return new Offline(getHost());
346     }
347
348     /**
349      * Makes a query message.
350      *
351      * @return a new message
352      */
353     protected final Query makeQuery() {
354         return new Query(getHost());
355     }
356
357     public final BucketAssignments getAssignments() {
358         return mgr.getAssignments();
359     }
360
361     public final String getHost() {
362         return mgr.getHost();
363     }
364
365     public final String getTopic() {
366         return mgr.getTopic();
367     }
368
369     public final PoolingProperties getProperties() {
370         return mgr.getProperties();
371     }
372 }