Fix checkstyle for features submodules.
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / state / State.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling.state;
22
23 import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
24 import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
25 import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
26
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import org.onap.policy.drools.pooling.CancellableScheduledTask;
31 import org.onap.policy.drools.pooling.PoolingManager;
32 import org.onap.policy.drools.pooling.PoolingProperties;
33 import org.onap.policy.drools.pooling.message.BucketAssignments;
34 import org.onap.policy.drools.pooling.message.Forward;
35 import org.onap.policy.drools.pooling.message.Heartbeat;
36 import org.onap.policy.drools.pooling.message.Identification;
37 import org.onap.policy.drools.pooling.message.Leader;
38 import org.onap.policy.drools.pooling.message.Message;
39 import org.onap.policy.drools.pooling.message.Offline;
40 import org.onap.policy.drools.pooling.message.Query;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * A state in the finite state machine.
46  * 
47  * <p>A state may have several timers associated with it, which must be cancelled whenever
48  * the state is changed. Assumes that timers are not continuously added to the same state.
49  */
50 public abstract class State {
51
52     private static final Logger logger = LoggerFactory.getLogger(State.class);
53
54     /**
55      * Host pool manager.
56      */
57     private final PoolingManager mgr;
58
59     /**
60      * Timers added by this state.
61      */
62     private final List<CancellableScheduledTask> timers = new LinkedList<>();
63
64     /**
65      * Constructor.
66      * 
67      * @param mgr pooling manager
68      */
69     public State(PoolingManager mgr) {
70         this.mgr = mgr;
71     }
72
73     /**
74      * Gets the server-side filter to use when polling the DMaaP internal topic. The
75      * default method returns a filter that accepts messages on the admin channel and on
76      * the host's own channel.
77      * 
78      * @return the server-side filter to use.
79      */
80     @SuppressWarnings("unchecked")
81     public Map<String, Object> getFilter() {
82         return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
83     }
84
85     /**
86      * Cancels the timers added by this state.
87      */
88     public final void cancelTimers() {
89         timers.forEach(timer -> timer.cancel());
90     }
91
92     /**
93      * Starts the state. The default method simply logs a message and returns.
94      */
95     public void start() {
96         logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
97     }
98
99     /**
100      * Transitions to the "start" state.
101      * 
102      * @return the new state
103      */
104     public final State goStart() {
105         return mgr.goStart();
106     }
107
108     /**
109      * Transitions to the "query" state.
110      * 
111      * @return the new state
112      */
113     public State goQuery() {
114         return mgr.goQuery();
115     }
116
117     /**
118      * Goes active with a new set of assignments.
119      * 
120      * @param asgn new assignments
121      * @return the new state, either Active or Inactive, depending on whether or not this
122      *         host has an assignment
123      */
124     protected State goActive(BucketAssignments asgn) {
125         startDistributing(asgn);
126
127         if (asgn != null && asgn.hasAssignment(getHost())) {
128             return mgr.goActive();
129
130         } else {
131             return goInactive();
132         }
133     }
134
135     /**
136      * Transitions to the "inactive" state.
137      * 
138      * @return the new state
139      */
140     protected State goInactive() {
141         return mgr.goInactive();
142     }
143
144     /**
145      * Processes a message. The default method passes it to the manager to handle and
146      * returns {@code null}.
147      * 
148      * @param msg message to be processed
149      * @return the new state, or {@code null} if the state is unchanged
150      */
151     public State process(Forward msg) {
152         if (!getHost().equals(msg.getChannel())) {
153             logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
154                             getTopic());
155             return null;
156         }
157
158         logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
159         mgr.handle(msg);
160         return null;
161     }
162
163     /**
164      * Processes a message. The default method just returns {@code null}.
165      * 
166      * @param msg message to be processed
167      * @return the new state, or {@code null} if the state is unchanged
168      */
169     public State process(Heartbeat msg) {
170         logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
171         return null;
172     }
173
174     /**
175      * Processes a message. The default method just returns {@code null}.
176      * 
177      * @param msg message to be processed
178      * @return the new state, or {@code null} if the state is unchanged
179      */
180     public State process(Identification msg) {
181         logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
182         return null;
183     }
184
185     /**
186      * Processes a message. The default method copies the assignments and then returns
187      * {@code null}.
188      * 
189      * @param msg message to be processed
190      * @return the new state, or {@code null} if the state is unchanged
191      */
192     public State process(Leader msg) {
193         if (isValid(msg)) {
194             logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
195             startDistributing(msg.getAssignments());
196         }
197
198         return null;
199     }
200
201     /**
202      * Processes a message. The default method just returns {@code null}.
203      * 
204      * @param msg message to be processed
205      * @return the new state, or {@code null} if the state is unchanged
206      */
207     public State process(Offline msg) {
208         logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
209         return null;
210     }
211
212     /**
213      * Processes a message. The default method just returns {@code null}.
214      * 
215      * @param msg message to be processed
216      * @return the new state, or {@code null} if the state is unchanged
217      */
218     public State process(Query msg) {
219         logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
220         return null;
221     }
222
223     /**
224      * Determines if a message is valid and did not originate from this host.
225      * 
226      * @param msg message to be validated
227      * @return {@code true} if the message is valid, {@code false} otherwise
228      */
229     protected boolean isValid(Leader msg) {
230         BucketAssignments asgn = msg.getAssignments();
231         if (asgn == null) {
232             logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
233             return false;
234         }
235
236         // ignore Leader messages from ourself
237         String source = msg.getSource();
238         if (source == null || source.equals(getHost())) {
239             logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
240             return false;
241         }
242
243         // the new leader must equal the source
244         boolean result = source.equals(asgn.getLeader());
245
246         if (!result) {
247             logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
248         }
249
250         return result;
251     }
252
253     /**
254      * Publishes a message.
255      * 
256      * @param msg message to be published
257      */
258     protected final void publish(Identification msg) {
259         mgr.publishAdmin(msg);
260     }
261
262     /**
263      * Publishes a message.
264      * 
265      * @param msg message to be published
266      */
267     protected final void publish(Leader msg) {
268         mgr.publishAdmin(msg);
269     }
270
271     /**
272      * Publishes a message.
273      * 
274      * @param msg message to be published
275      */
276     protected final void publish(Offline msg) {
277         mgr.publishAdmin(msg);
278     }
279
280     /**
281      * Publishes a message.
282      * 
283      * @param msg message to be published
284      */
285     protected final void publish(Query msg) {
286         mgr.publishAdmin(msg);
287     }
288
289     /**
290      * Publishes a message on the specified channel.
291      * 
292      * @param channel channel
293      * @param msg message to be published
294      */
295     protected final void publish(String channel, Forward msg) {
296         mgr.publish(channel, msg);
297     }
298
299     /**
300      * Publishes a message on the specified channel.
301      * 
302      * @param channel channel
303      * @param msg message to be published
304      */
305     protected final void publish(String channel, Heartbeat msg) {
306         mgr.publish(channel, msg);
307     }
308
309     /**
310      * Starts distributing messages using the specified bucket assignments.
311      * 
312      * @param assignments assignments
313      */
314     protected final void startDistributing(BucketAssignments assignments) {
315         if (assignments != null) {
316             mgr.startDistributing(assignments);
317         }
318     }
319
320     /**
321      * Schedules a timer to fire after a delay.
322      * 
323      * @param delayMs delay in ms
324      * @param task task
325      */
326     protected final void schedule(long delayMs, StateTimerTask task) {
327         timers.add(mgr.schedule(delayMs, task));
328     }
329
330     /**
331      * Schedules a timer to fire repeatedly.
332      * 
333      * @param initialDelayMs initial delay ms
334      * @param delayMs delay ms
335      * @param task task
336      */
337     protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
338         timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
339     }
340
341     /**
342      * Indicates that we failed to see our own heartbeat; must be a problem with the
343      * internal topic. Assumes the problem is temporary and continues to use the current
344      * bucket assignments.
345      * 
346      * @return a new {@link StartState}
347      */
348     protected final State missedHeartbeat() {
349         publish(makeOffline());
350
351         return mgr.goStart();
352     }
353
354     /**
355      * Indicates that the internal topic failed; this should only be invoked from the
356      * StartState. Discards bucket assignments and begins processing everything locally.
357      * 
358      * @return a new {@link InactiveState}
359      */
360     protected final State internalTopicFailed() {
361         publish(makeOffline());
362         mgr.startDistributing(null);
363
364         return mgr.goInactive();
365     }
366
367     /**
368      * Makes a heart beat message.
369      * 
370      * @param timestampMs time, in milliseconds, associated with the message
371      * 
372      * @return a new message
373      */
374     protected final Heartbeat makeHeartbeat(long timestampMs) {
375         return new Heartbeat(getHost(), timestampMs);
376     }
377
378     /**
379      * Makes an Identification message.
380      * 
381      * @return a new message
382      */
383     protected Identification makeIdentification() {
384         return new Identification(getHost(), getAssignments());
385     }
386
387     /**
388      * Makes an "offline" message.
389      * 
390      * @return a new message
391      */
392     protected final Offline makeOffline() {
393         return new Offline(getHost());
394     }
395
396     /**
397      * Makes a query message.
398      * 
399      * @return a new message
400      */
401     protected final Query makeQuery() {
402         return new Query(getHost());
403     }
404
405     public final BucketAssignments getAssignments() {
406         return mgr.getAssignments();
407     }
408
409     public final String getHost() {
410         return mgr.getHost();
411     }
412
413     public final String getTopic() {
414         return mgr.getTopic();
415     }
416
417     public final PoolingProperties getProperties() {
418         return mgr.getProperties();
419     }
420 }