Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / main / java / org / onap / policy / drools / pooling / state / State.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import java.util.LinkedList;
25 import java.util.List;
26 import org.onap.policy.drools.pooling.CancellableScheduledTask;
27 import org.onap.policy.drools.pooling.PoolingManager;
28 import org.onap.policy.drools.pooling.PoolingProperties;
29 import org.onap.policy.drools.pooling.message.BucketAssignments;
30 import org.onap.policy.drools.pooling.message.Heartbeat;
31 import org.onap.policy.drools.pooling.message.Identification;
32 import org.onap.policy.drools.pooling.message.Leader;
33 import org.onap.policy.drools.pooling.message.Offline;
34 import org.onap.policy.drools.pooling.message.Query;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * A state in the finite state machine.
40  *
41  * <p>A state may have several timers associated with it, which must be cancelled whenever
42  * the state is changed. Assumes that timers are not continuously added to the same state.
43  */
44 public abstract class State {
45
46     private static final Logger logger = LoggerFactory.getLogger(State.class);
47
48     /**
49      * Host pool manager.
50      */
51     private final PoolingManager mgr;
52
53     /**
54      * Timers added by this state.
55      */
56     private final List<CancellableScheduledTask> timers = new LinkedList<>();
57
58     /**
59      * Constructor.
60      *
61      * @param mgr pooling manager
62      */
63     protected State(PoolingManager mgr) {
64         this.mgr = mgr;
65     }
66
67     /**
68      * Cancels the timers added by this state.
69      */
70     public final void cancelTimers() {
71         timers.forEach(CancellableScheduledTask::cancel);
72     }
73
74     /**
75      * Starts the state. The default method simply logs a message and returns.
76      */
77     public void start() {
78         logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
79     }
80
81     /**
82      * Transitions to the "start" state.
83      *
84      * @return the new state
85      */
86     public final State goStart() {
87         return mgr.goStart();
88     }
89
90     /**
91      * Transitions to the "query" state.
92      *
93      * @return the new state
94      */
95     public State goQuery() {
96         return mgr.goQuery();
97     }
98
99     /**
100      * Goes active with a new set of assignments.
101      *
102      * @param asgn new assignments
103      * @return the new state, either Active or Inactive, depending on whether or not this
104      *         host has an assignment
105      */
106     protected State goActive(BucketAssignments asgn) {
107         startDistributing(asgn);
108
109         if (asgn != null && asgn.hasAssignment(getHost())) {
110             return mgr.goActive();
111
112         } else {
113             return goInactive();
114         }
115     }
116
117     /**
118      * Transitions to the "inactive" state.
119      *
120      * @return the new state
121      */
122     protected State goInactive() {
123         return mgr.goInactive();
124     }
125
126     /**
127      * Processes a message. The default method just returns {@code null}.
128      *
129      * @param msg message to be processed
130      * @return the new state, or {@code null} if the state is unchanged
131      */
132     public State process(Heartbeat msg) {
133         logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
134         return null;
135     }
136
137     /**
138      * Processes a message. The default method just returns {@code null}.
139      *
140      * @param msg message to be processed
141      * @return the new state, or {@code null} if the state is unchanged
142      */
143     public State process(Identification msg) {
144         logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
145         return null;
146     }
147
148     /**
149      * Processes a message. The default method copies the assignments and then returns
150      * {@code null}.
151      *
152      * @param msg message to be processed
153      * @return the new state, or {@code null} if the state is unchanged
154      */
155     public State process(Leader msg) {
156         if (isValid(msg)) {
157             logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
158             startDistributing(msg.getAssignments());
159         }
160
161         return null;
162     }
163
164     /**
165      * Processes a message. The default method just returns {@code null}.
166      *
167      * @param msg message to be processed
168      * @return the new state, or {@code null} if the state is unchanged
169      */
170     public State process(Offline msg) {
171         logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
172         return null;
173     }
174
175     /**
176      * Processes a message. The default method just returns {@code null}.
177      *
178      * @param msg message to be processed
179      * @return the new state, or {@code null} if the state is unchanged
180      */
181     public State process(Query msg) {
182         logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
183         return null;
184     }
185
186     /**
187      * Determines if a message is valid and did not originate from this host.
188      *
189      * @param msg message to be validated
190      * @return {@code true} if the message is valid, {@code false} otherwise
191      */
192     protected boolean isValid(Leader msg) {
193         BucketAssignments asgn = msg.getAssignments();
194         if (asgn == null) {
195             logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
196             return false;
197         }
198
199         // ignore Leader messages from ourself
200         String source = msg.getSource();
201         if (source == null || source.equals(getHost())) {
202             logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
203             return false;
204         }
205
206         // the new leader must equal the source
207         boolean result = source.equals(asgn.getLeader());
208
209         if (!result) {
210             logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
211         }
212
213         return result;
214     }
215
216     /**
217      * Publishes a message.
218      *
219      * @param msg message to be published
220      */
221     protected final void publish(Identification msg) {
222         mgr.publishAdmin(msg);
223     }
224
225     /**
226      * Publishes a message.
227      *
228      * @param msg message to be published
229      */
230     protected final void publish(Leader msg) {
231         mgr.publishAdmin(msg);
232     }
233
234     /**
235      * Publishes a message.
236      *
237      * @param msg message to be published
238      */
239     protected final void publish(Offline msg) {
240         mgr.publishAdmin(msg);
241     }
242
243     /**
244      * Publishes a message.
245      *
246      * @param msg message to be published
247      */
248     protected final void publish(Query msg) {
249         mgr.publishAdmin(msg);
250     }
251
252     /**
253      * Publishes a message on the specified channel.
254      *
255      * @param channel channel
256      * @param msg message to be published
257      */
258     protected final void publish(String channel, Heartbeat msg) {
259         mgr.publish(channel, msg);
260     }
261
262     /**
263      * Starts distributing messages using the specified bucket assignments.
264      *
265      * @param assignments assignments
266      */
267     protected final void startDistributing(BucketAssignments assignments) {
268         if (assignments != null) {
269             mgr.startDistributing(assignments);
270         }
271     }
272
273     /**
274      * Schedules a timer to fire after a delay.
275      *
276      * @param delayMs delay in ms
277      * @param task task
278      */
279     protected final void schedule(long delayMs, StateTimerTask task) {
280         timers.add(mgr.schedule(delayMs, task));
281     }
282
283     /**
284      * Schedules a timer to fire repeatedly.
285      *
286      * @param initialDelayMs initial delay ms
287      * @param delayMs delay ms
288      * @param task task
289      */
290     protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
291         timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
292     }
293
294     /**
295      * Indicates that we failed to see our own heartbeat; must be a problem with the
296      * internal topic. Assumes the problem is temporary and continues to use the current
297      * bucket assignments.
298      *
299      * @return a new {@link StartState}
300      */
301     protected final State missedHeartbeat() {
302         publish(makeOffline());
303
304         return mgr.goStart();
305     }
306
307     /**
308      * Indicates that the internal topic failed; this should only be invoked from the
309      * StartState. Discards bucket assignments and begins processing everything locally.
310      *
311      * @return a new {@link InactiveState}
312      */
313     protected final State internalTopicFailed() {
314         publish(makeOffline());
315         mgr.startDistributing(null);
316
317         return mgr.goInactive();
318     }
319
320     /**
321      * Makes a heart beat message.
322      *
323      * @param timestampMs time, in milliseconds, associated with the message
324      *
325      * @return a new message
326      */
327     protected final Heartbeat makeHeartbeat(long timestampMs) {
328         return new Heartbeat(getHost(), timestampMs);
329     }
330
331     /**
332      * Makes an Identification message.
333      *
334      * @return a new message
335      */
336     protected Identification makeIdentification() {
337         return new Identification(getHost(), getAssignments());
338     }
339
340     /**
341      * Makes an "offline" message.
342      *
343      * @return a new message
344      */
345     protected final Offline makeOffline() {
346         return new Offline(getHost());
347     }
348
349     /**
350      * Makes a query message.
351      *
352      * @return a new message
353      */
354     protected final Query makeQuery() {
355         return new Query(getHost());
356     }
357
358     public final BucketAssignments getAssignments() {
359         return mgr.getAssignments();
360     }
361
362     public final String getHost() {
363         return mgr.getHost();
364     }
365
366     public final String getTopic() {
367         return mgr.getTopic();
368     }
369
370     public final PoolingProperties getProperties() {
371         return mgr.getProperties();
372     }
373 }