Remove major and minor code smells in dr-node
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryQueue.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.util.Arrays;
31 import java.util.Hashtable;
32 import java.util.Vector;
33 import org.jetbrains.annotations.Nullable;
34
35 /**
36  * Mechanism for monitoring and controlling delivery of files to a destination.
37  *
38  * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single destination (a subscription or another data
39  * router node) and assigns delivery threads to try to deliver them.  It also maintains a delivery status that causes it
40  * to back off on delivery attempts after a failure.
41  *
42  * <p>If the most recent delivery result was a failure, then no more attempts will be made for a period of time.
43  * Initially, and on the first failure following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer()
44  * (milliseconds). If, after this delay, additional failures occur, each failure will multiply the delay by
45  * DeliveryQueueHelper.getFailureBackoff() up to a maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
46  * Note that this behavior applies to the delivery queue as a whole and not to individual files in the queue.  If
47  * multiple files are being delivered and one fails, the delay will be started.  If a second delivery fails while the
48  * delay was active, it will not change the delay or change the duration of any subsequent delay. If, however, it
49  * succeeds, it will cancel the delay.
50  *
51  * <p>The queue maintains 3 collections of files to deliver: A to do list of files that will be attempted, a working
52  * set of files that are being attempted, and a retry set of files that were attempted and failed. Whenever the to do
53  * list is empty and needs to be refilled, a scan of the spool directory is made and the file names sorted.  Any files
54  * in the working set are ignored. If a DeliveryTask for the file is in the retry set, then that delivery task is placed
55  * on the to do list.  Otherwise, a new DeliveryTask for the file is created and placed on the to do list. If, when a
56  * DeliveryTask is about to be removed from the to do list, its age exceeds DeliveryQueueHelper.getExpirationTimer(),
57  * then it is instead marked as expired.
58  *
59  * <p>A delivery queue also maintains a skip flag.  This flag is true if the failure timer is active or if no files are
60  * found in a directory scan.
61  */
62 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
63
64     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
65     private DeliveryQueueHelper deliveryQueueHelper;
66     private DestInfo destinationInfo;
67     private Hashtable<String, DeliveryTask> working = new Hashtable<>();
68     private Hashtable<String, DeliveryTask> retry = new Hashtable<>();
69     private int todoindex;
70     private boolean failed;
71     private long failduration;
72     private long resumetime;
73     private File dir;
74     private Vector<DeliveryTask> todo = new Vector<>();
75
76     /**
77      * Create a delivery queue for a given destination info.
78      */
79     DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
80         this.deliveryQueueHelper = deliveryQueueHelper;
81         this.destinationInfo = destinationInfo;
82         dir = new File(destinationInfo.getSpool());
83         dir.mkdirs();
84     }
85
86     /**
87      * Try to cancel a delivery task.
88      *
89      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
90      */
91     synchronized long cancelTask(String pubid) {
92         if (working.get(pubid) != null) {
93             return (0);
94         }
95         DeliveryTask dt = retry.get(pubid);
96         if (dt == null) {
97             for (int i = todoindex; i < todo.size(); i++) {
98                 DeliveryTask xdt = todo.get(i);
99                 if (xdt.getPublishId().equals(pubid)) {
100                     dt = xdt;
101                     break;
102                 }
103             }
104         }
105         if (dt == null) {
106             dt = new DeliveryTask(this, pubid);
107             if (dt.getFileId() == null) {
108                 return (0);
109             }
110         }
111         if (dt.isCleaned()) {
112             return (0);
113         }
114         StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(),
115                 dt.getLength(), "diskFull", dt.getAttempts());
116         dt.clean();
117         return (dt.getLength());
118     }
119
120     /**
121      * Mark that a delivery task has succeeded.
122      */
123     private synchronized void markSuccess(DeliveryTask task) {
124         working.remove(task.getPublishId());
125         task.clean();
126         failed = false;
127         failduration = 0;
128     }
129
130     /**
131      * Mark that a delivery task has expired.
132      */
133     private synchronized void markExpired(DeliveryTask task) {
134         task.clean();
135     }
136
137     /**
138      * Mark that a delivery task has failed permanently.
139      */
140     private synchronized void markFailNoRetry(DeliveryTask task) {
141         working.remove(task.getPublishId());
142         task.clean();
143         failed = false;
144         failduration = 0;
145     }
146
147     private void fdupdate() {
148         if (!failed) {
149             failed = true;
150             if (failduration == 0) {
151                 if (destinationInfo.isPrivilegedSubscriber()) {
152                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
153                 } else {
154                     failduration = deliveryQueueHelper.getInitFailureTimer();
155                 }
156             }
157             resumetime = System.currentTimeMillis() + failduration;
158             long maxdur = deliveryQueueHelper.getMaxFailureTimer();
159             failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
160             if (failduration > maxdur) {
161                 failduration = maxdur;
162             }
163         }
164     }
165
166     /**
167      * Mark that a delivery task has been redirected.
168      */
169     private synchronized void markRedirect(DeliveryTask task) {
170         working.remove(task.getPublishId());
171         retry.put(task.getPublishId(), task);
172     }
173
174     /**
175      * Mark that a delivery task has temporarily failed.
176      */
177     private synchronized void markFailWithRetry(DeliveryTask task) {
178         working.remove(task.getPublishId());
179         retry.put(task.getPublishId(), task);
180         fdupdate();
181     }
182
183     /**
184      * Get the next task.
185      */
186     synchronized DeliveryTask getNext() {
187         DeliveryTask ret = peekNext();
188         if (ret != null) {
189             todoindex++;
190             working.put(ret.getPublishId(), ret);
191         }
192         return (ret);
193     }
194
195     /**
196      * Peek at the next task.
197      */
198     synchronized DeliveryTask peekNext() {
199         long now = System.currentTimeMillis();
200         long mindate = now - deliveryQueueHelper.getExpirationTimer();
201         if (failed) {
202             if (now > resumetime) {
203                 failed = false;
204             } else {
205                 return (null);
206             }
207         }
208         while (true) {
209             if (todoindex >= todo.size()) {
210                 todoindex = 0;
211                 todo = new Vector<>();
212                 String[] files = dir.list();
213                 Arrays.sort(files);
214                 scanForNextTask(files);
215                 retry = new Hashtable<>();
216             }
217             DeliveryTask dt = getDeliveryTask(mindate);
218             if (dt != null) {
219                 return dt;
220             }
221             return null;
222         }
223     }
224
225     /**
226      * Update the destination info for this delivery queue.
227      */
228     public void config(DestInfo destinationInfo) {
229         this.destinationInfo = destinationInfo;
230     }
231
232     /**
233      * Get the dest info.
234      */
235     public DestInfo getDestinationInfo() {
236         return (destinationInfo);
237     }
238
239     /**
240      * Get the config manager.
241      */
242     public DeliveryQueueHelper getConfig() {
243         return (deliveryQueueHelper);
244     }
245
246     /**
247      * Exceptional condition occurred during delivery.
248      */
249     public void reportDeliveryExtra(DeliveryTask task, long sent) {
250         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
251     }
252
253     /**
254      * Message too old to deliver.
255      */
256     void reportExpiry(DeliveryTask task) {
257         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
258                 task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
259         markExpired(task);
260     }
261
262     /**
263      * Completed a delivery attempt.
264      */
265     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
266         if (status < 300) {
267             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
268                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
269             if (destinationInfo.isPrivilegedSubscriber()) {
270                 task.setResumeTime(
271                         System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
272                 markFailWithRetry(task);
273             } else {
274                 markSuccess(task);
275             }
276         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
277             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
278                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
279             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
280                 markRedirect(task);
281             } else {
282                 StatusLog
283                         .logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
284                                 task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
285                 markFailNoRetry(task);
286             }
287         } else if (status < 500 && status != 429) {
288             // Status 429 is the standard response for Too Many Requests and indicates
289             // that a file needs to be delivered again at a later time.
290             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
291                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
292             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
293                     task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
294             markFailNoRetry(task);
295         } else {
296             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
297                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
298             markFailWithRetry(task);
299         }
300     }
301
302     /**
303      * Delivery failed by reason of an exception.
304      */
305     public void reportException(DeliveryTask task, Exception exception) {
306         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
307                 task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
308         deliveryQueueHelper.handleUnreachable(destinationInfo);
309         markFailWithRetry(task);
310     }
311
312     /**
313      * Get the feed ID for a subscription.
314      *
315      * @param subid The subscription ID
316      * @return The feed ID
317      */
318     public String getFeedId(String subid) {
319         return (deliveryQueueHelper.getFeedId(subid));
320     }
321
322     /**
323      * Get the URL to deliver a message to given the file ID.
324      */
325     public String getDestURL(String fileid) {
326         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
327     }
328
329     /**
330      * Deliver files until there's a failure or there are no more files to deliver.
331      */
332     public void run() {
333         DeliveryTask task;
334         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
335         int filestogo = deliveryQueueHelper.getFairFileLimit();
336         while ((task = getNext()) != null) {
337             task.run();
338             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
339                 break;
340             }
341         }
342     }
343
344     /**
345      * Is there no work to do for this queue right now.
346      */
347     synchronized boolean isSkipSet() {
348         return (peekNext() == null);
349     }
350
351     /**
352      * Reset the retry timer.
353      */
354     void resetQueue() {
355         resumetime = System.currentTimeMillis();
356     }
357
358     /**
359      * Get task if in queue and mark as success.
360      */
361     boolean markTaskSuccess(String pubId) {
362         DeliveryTask task = working.get(pubId);
363         if (task != null) {
364             markSuccess(task);
365             return true;
366         }
367         task = retry.get(pubId);
368         if (task != null) {
369             retry.remove(pubId);
370             task.clean();
371             resetQueue();
372             failduration = 0;
373             return true;
374         }
375         return false;
376     }
377
378     private void scanForNextTask(String[] files) {
379         for (String fname : files) {
380             String pubId = getPubId(fname);
381             if (pubId == null) {
382                 continue;
383             }
384             DeliveryTask dt = retry.get(pubId);
385             if (dt == null) {
386                 dt = new DeliveryTask(this, pubId);
387             }
388             todo.add(dt);
389         }
390     }
391
392     @Nullable
393     private DeliveryTask getDeliveryTask(long mindate) {
394         if (todoindex < todo.size()) {
395             DeliveryTask dt = todo.get(todoindex);
396             if (dt.isCleaned()) {
397                 todoindex++;
398             }
399             if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
400                 retry.put(dt.getPublishId(), dt);
401                 todoindex++;
402             }
403             if (dt.getDate() >= mindate) {
404                 return (dt);
405             }
406             todoindex++;
407             reportExpiry(dt);
408         }
409         return null;
410     }
411
412     @Nullable
413     private String getPubId(String fname) {
414         if (!fname.endsWith(".M")) {
415             return null;
416         }
417         String fname2 = fname.substring(0, fname.length() - 2);
418         long pidtime = 0;
419         int dot = fname2.indexOf('.');
420         if (dot < 1) {
421             return null;
422         }
423         try {
424             pidtime = Long.parseLong(fname2.substring(0, dot));
425         } catch (Exception e) {
426             logger.error("Exception", e);
427         }
428         if (pidtime < 1000000000000L) {
429             return null;
430         }
431         if (working.get(fname2) != null) {
432             return null;
433         }
434         return fname2;
435     }
436 }