Logging changes and unit tests
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryQueue.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.*;
30 import java.util.*;
31 import org.jetbrains.annotations.Nullable;
32
33 /**
34  * Mechanism for monitoring and controlling delivery of files to a destination.
35  * <p>
36  * The DeliveryQueue class maintains lists of DeliveryTasks for a single
37  * destination (a subscription or another data router node) and assigns
38  * delivery threads to try to deliver them.  It also maintains a delivery
39  * status that causes it to back off on delivery attempts after a failure.
40  * <p>
41  * If the most recent delivery result was a failure, then no more attempts
42  * will be made for a period of time.  Initially, and on the first failure
43  * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
44  * If, after this delay, additional failures occur, each failure will
45  * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
46  * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
47  * Note that this behavior applies to the delivery queue as a whole and not
48  * to individual files in the queue.  If multiple files are being
49  * delivered and one fails, the delay will be started.  If a second
50  * delivery fails while the delay was active, it will not change the delay
51  * or change the duration of any subsequent delay.
52  * If, however, it succeeds, it will cancel the delay.
53  * <p>
54  * The queue maintains 3 collections of files to deliver: A todo list of
55  * files that will be attempted, a working set of files that are being
56  * attempted, and a retry set of files that were attempted and failed.
57  * Whenever the todo list is empty and needs to be refilled, a scan of the
58  * spool directory is made and the file names sorted.  Any files in the working set are ignored.
59  * If a DeliveryTask for the file is in the retry set, then that delivery
60  * task is placed on the todo list.  Otherwise, a new DeliveryTask for the
61  * file is created and placed on the todo list.
62  * If, when a DeliveryTask is about to be removed from the todo list, its
63  * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
64  * marked as expired.
65  * <p>
66  * A delivery queue also maintains a skip flag.  This flag is true if the
67  * failure timer is active or if no files are found in a directory scan.
68  */
69 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
70     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
71     private DeliveryQueueHelper deliveryQueueHelper;
72     private DestInfo destinationInfo;
73     private Hashtable<String, DeliveryTask> working = new Hashtable<>();
74     private Hashtable<String, DeliveryTask> retry = new Hashtable<>();
75     private int todoindex;
76     private boolean failed;
77     private long failduration;
78     private long resumetime;
79     private File dir;
80     private Vector<DeliveryTask> todo = new Vector<>();
81
82     /**
83      * Try to cancel a delivery task.
84      *
85      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
86      */
87     synchronized long cancelTask(String pubid) {
88         if (working.get(pubid) != null) {
89             return (0);
90         }
91         DeliveryTask dt = retry.get(pubid);
92         if (dt == null) {
93             for (int i = todoindex; i < todo.size(); i++) {
94                 DeliveryTask xdt = todo.get(i);
95                 if (xdt.getPublishId().equals(pubid)) {
96                     dt = xdt;
97                     break;
98                 }
99             }
100         }
101         if (dt == null) {
102             dt = new DeliveryTask(this, pubid);
103             if (dt.getFileId() == null) {
104                 return (0);
105             }
106         }
107         if (dt.isCleaned()) {
108             return (0);
109         }
110         StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
111         dt.clean();
112         return (dt.getLength());
113     }
114
115     /**
116      * Mark that a delivery task has succeeded.
117      */
118     private synchronized void markSuccess(DeliveryTask task) {
119         working.remove(task.getPublishId());
120         logger.info(task.getPublishId() + " marked as success.");
121         task.clean();
122         failed = false;
123         failduration = 0;
124     }
125
126     /**
127      * Mark that a delivery task has expired.
128      */
129     private synchronized void markExpired(DeliveryTask task) {
130         logger.info(task.getPublishId() + " marked as expired.");
131         task.clean();
132     }
133
134     /**
135      * Mark that a delivery task has failed permanently.
136      */
137     private synchronized void markFailNoRetry(DeliveryTask task) {
138         working.remove(task.getPublishId());
139         logger.info(task.getPublishId() + " marked as failed permanently");
140         task.clean();
141         failed = false;
142         failduration = 0;
143     }
144
145     private void fdupdate() {
146         if (!failed) {
147             failed = true;
148             if (failduration == 0) {
149                 if (destinationInfo.isPrivilegedSubscriber()) {
150                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
151                 } else{
152                     failduration = deliveryQueueHelper.getInitFailureTimer();
153                 }
154             }
155             resumetime = System.currentTimeMillis() + failduration;
156             long maxdur = deliveryQueueHelper.getMaxFailureTimer();
157             failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
158             if (failduration > maxdur) {
159                 failduration = maxdur;
160             }
161         }
162     }
163
164     /**
165      * Mark that a delivery task has been redirected.
166      */
167     private synchronized void markRedirect(DeliveryTask task) {
168         working.remove(task.getPublishId());
169         logger.info(task.getPublishId() + " marked as redirected.");
170         retry.put(task.getPublishId(), task);
171     }
172
173     /**
174      * Mark that a delivery task has temporarily failed.
175      */
176     private synchronized void markFailWithRetry(DeliveryTask task) {
177         working.remove(task.getPublishId());
178         logger.info(task.getPublishId() + " marked as temporarily failed.");
179         retry.put(task.getPublishId(), task);
180         fdupdate();
181     }
182
183     /**
184      * Get the next task.
185      */
186     synchronized DeliveryTask getNext() {
187         DeliveryTask ret = peekNext();
188         if (ret != null) {
189             todoindex++;
190             working.put(ret.getPublishId(), ret);
191         }
192         return (ret);
193     }
194
195     /**
196      * Peek at the next task.
197      */
198     synchronized DeliveryTask peekNext() {
199         long now = System.currentTimeMillis();
200         long mindate = now - deliveryQueueHelper.getExpirationTimer();
201         if (failed) {
202             if (now > resumetime) {
203                 failed = false;
204             } else {
205                 return (null);
206             }
207         }
208         while (true) {
209             if (todoindex >= todo.size()) {
210                 todoindex = 0;
211                 todo = new Vector<>();
212                 String[] files = dir.list();
213                 Arrays.sort(files);
214                 scanForNextTask(files);
215                 retry = new Hashtable<>();
216             }
217             DeliveryTask dt = getDeliveryTask(mindate);
218             if (dt != null) {
219                 return dt;
220             }
221             return null;
222
223         }
224     }
225
226     /**
227      * Create a delivery queue for a given destination info
228      */
229     DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
230         this.deliveryQueueHelper = deliveryQueueHelper;
231         this.destinationInfo = destinationInfo;
232         dir = new File(destinationInfo.getSpool());
233         dir.mkdirs();
234     }
235
236     /**
237      * Update the destination info for this delivery queue
238      */
239     public void config(DestInfo destinationInfo) {
240         this.destinationInfo = destinationInfo;
241     }
242
243     /**
244      * Get the dest info
245      */
246     public DestInfo getDestinationInfo() {
247         return (destinationInfo);
248     }
249
250     /**
251      * Get the config manager
252      */
253     public DeliveryQueueHelper getConfig() {
254         return (deliveryQueueHelper);
255     }
256
257     /**
258      * Exceptional condition occurred during delivery
259      */
260     public void reportDeliveryExtra(DeliveryTask task, long sent) {
261         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
262     }
263
264     /**
265      * Message too old to deliver
266      */
267     void reportExpiry(DeliveryTask task) {
268         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
269         markExpired(task);
270     }
271
272     /**
273      * Completed a delivery attempt
274      */
275     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
276         if (status < 300) {
277             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
278             if (destinationInfo.isPrivilegedSubscriber()) {
279                 task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
280                 markFailWithRetry(task);
281             } else {
282                 markSuccess(task);
283             }
284         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
285             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
286             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
287                 markRedirect(task);
288             } else {
289                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
290                 markFailNoRetry(task);
291             }
292         } else if (status < 500 && status != 429) {         // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time.
293             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
294             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
295             markFailNoRetry(task);
296         } else {
297             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
298             markFailWithRetry(task);
299         }
300     }
301
302     /**
303      * Delivery failed by reason of an exception
304      */
305     public void reportException(DeliveryTask task, Exception exception) {
306         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
307         deliveryQueueHelper.handleUnreachable(destinationInfo);
308         markFailWithRetry(task);
309     }
310
311     /**
312      * Get the feed ID for a subscription
313      *
314      * @param subid The subscription ID
315      * @return The feed ID
316      */
317     public String getFeedId(String subid) {
318         return (deliveryQueueHelper.getFeedId(subid));
319     }
320
321     /**
322      * Get the URL to deliver a message to given the file ID
323      */
324     public String getDestURL(String fileid) {
325         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
326     }
327
328     /**
329      * Deliver files until there's a failure or there are no more
330      * files to deliver
331      */
332     public void run() {
333         DeliveryTask task;
334         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
335         int filestogo = deliveryQueueHelper.getFairFileLimit();
336         while ((task = getNext()) != null) {
337             logger.info("Processing file: " + task.getPublishId());
338             task.run();
339             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
340                 break;
341             }
342         }
343     }
344
345     /**
346      * Is there no work to do for this queue right now?
347      */
348     synchronized boolean isSkipSet() {
349         return (peekNext() == null);
350     }
351
352     /**
353      * Reset the retry timer
354      */
355     void resetQueue() {
356         resumetime = System.currentTimeMillis();
357     }
358
359     /**
360      * Get task if in queue and mark as success
361      */
362     boolean markTaskSuccess(String pubId) {
363         DeliveryTask task = working.get(pubId);
364         if (task != null) {
365             markSuccess(task);
366             return true;
367         }
368         task = retry.get(pubId);
369         if (task != null) {
370             retry.remove(pubId);
371             task.clean();
372             resetQueue();
373             failduration = 0;
374             return true;
375         }
376         return false;
377     }
378     private void scanForNextTask(String[] files) {
379         for (String fname : files) {
380             String pubId = getPubId(fname);
381             if (pubId == null) {
382                 continue;
383             }
384             DeliveryTask dt = retry.get(pubId);
385             if (dt == null) {
386                 dt = new DeliveryTask(this, pubId);
387             }
388             todo.add(dt);
389         }
390     }
391
392     @Nullable
393     private DeliveryTask getDeliveryTask(long mindate) {
394         if (todoindex < todo.size()) {
395             DeliveryTask dt = todo.get(todoindex);
396             if (dt.isCleaned()) {
397                 todoindex++;
398             }
399             if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
400                 retry.put(dt.getPublishId(), dt);
401                 todoindex++;
402             }
403             if (dt.getDate() >= mindate) {
404                 return (dt);
405             }
406             todoindex++;
407             reportExpiry(dt);
408         }
409         return null;
410     }
411
412     @Nullable
413     private String getPubId(String fname) {
414         if (!fname.endsWith(".M")) {
415             return null;
416         }
417         String fname2 = fname.substring(0, fname.length() - 2);
418         long pidtime = 0;
419         int dot = fname2.indexOf('.');
420         if (dot < 1) {
421             return null;
422         }
423         try {
424             pidtime = Long.parseLong(fname2.substring(0, dot));
425         } catch (Exception e) {
426             logger.error("Exception", e);
427         }
428         if (pidtime < 1000000000000L) {
429             return null;
430         }
431         if (working.get(fname2) != null) {
432             return null;
433         }
434         return fname2;
435     }
436 }