DMAAP-1195 [DR] Remove DR code smells
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / DeliveryQueue.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.HashMap;
33 import java.util.List;
34 import org.jetbrains.annotations.Nullable;
35
36 /**
37  * Mechanism for monitoring and controlling delivery of files to a destination.
38  *
39  * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single
40  * destination (a subscription or another data router node) and assigns
41  * delivery threads to try to deliver them.  It also maintains a delivery
42  * status that causes it to back off on delivery attempts after a failure.
43  *
44  * <p>If the most recent delivery result was a failure, then no more attempts
45  * will be made for a period of time.  Initially, and on the first failure
46  * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
47  * If, after this delay, additional failures occur, each failure will
48  * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
49  * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
50  * Note that this behavior applies to the delivery queue as a whole and not
51  * to individual files in the queue.  If multiple files are being
52  * delivered and one fails, the delay will be started.  If a second
53  * delivery fails while the delay was active, it will not change the delay
54  * or change the duration of any subsequent delay.
55  * If, however, it succeeds, it will cancel the delay.
56  * The queue maintains 3 collections of files to deliver: A todoList of
57  * files that will be attempted, a working set of files that are being
58  * attempted, and a retry set of files that were attempted and failed.
59  * Whenever the todoList is empty and needs to be refilled, a scan of the
60  * spool directory is made and the file names sorted.  Any files in the working set are ignored.
61  * If a DeliveryTask for the file is in the retry set, then that delivery
62  * task is placed on the todoList.  Otherwise, a new DeliveryTask for the
63  * file is created and placed on the todoList.
64  * If, when a DeliveryTask is about to be removed from the todoList, its
65  * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
66  * marked as expired.
67  *
68  * <p>A delivery queue also maintains a skip flag.  This flag is true if the
69  * failure timer is active or if no files are found in a directory scan.
70  */
71 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
72     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
73     private DeliveryQueueHelper deliveryQueueHelper;
74
75     private DestInfo destinationInfo;
76     private HashMap<String, DeliveryTask> working = new HashMap<>();
77     private HashMap<String, DeliveryTask> retry = new HashMap<>();
78     private int todoindex;
79     private boolean failed;
80     private long failduration;
81     private long resumetime;
82     private File dir;
83     private List<DeliveryTask> todoList = new ArrayList<>();
84
85     /**
86      * Create a delivery queue for a given destination info.
87      */
88     DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
89         this.deliveryQueueHelper = deliveryQueueHelper;
90         this.destinationInfo = destinationInfo;
91         dir = new File(destinationInfo.getSpool());
92         dir.mkdirs();
93     }
94
95     /**
96      * Try to cancel a delivery task.
97      *
98      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
99      */
100     synchronized long cancelTask(String pubid) {
101         if (working.get(pubid) != null) {
102             return (0);
103         }
104         DeliveryTask dt = retry.get(pubid);
105         if (dt == null) {
106             for (int i = todoindex; i < todoList.size(); i++) {
107                 DeliveryTask xdt = todoList.get(i);
108                 if (xdt.getPublishId().equals(pubid)) {
109                     dt = xdt;
110                     break;
111                 }
112             }
113         }
114         if (dt == null) {
115             dt = new DeliveryTask(this, pubid);
116             if (dt.getFileId() == null) {
117                 return (0);
118             }
119         }
120         if (dt.isCleaned()) {
121             return (0);
122         }
123         StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(),
124                 dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
125         dt.clean();
126         return (dt.getLength());
127     }
128
129     /**
130      * Mark that a delivery task has succeeded.
131      */
132     private synchronized void markSuccess(DeliveryTask task) {
133         working.remove(task.getPublishId());
134         logger.info(task.getPublishId() + " marked as success.");
135         task.clean();
136         failed = false;
137         failduration = 0;
138     }
139
140     /**
141      * Mark that a delivery task has expired.
142      */
143     private synchronized void markExpired(DeliveryTask task) {
144         logger.info(task.getPublishId() + " marked as expired.");
145         task.clean();
146     }
147
148     /**
149      * Mark that a delivery task has failed permanently.
150      */
151     private synchronized void markFailNoRetry(DeliveryTask task) {
152         working.remove(task.getPublishId());
153         logger.info(task.getPublishId() + " marked as failed permanently");
154         task.clean();
155         failed = false;
156         failduration = 0;
157     }
158
159     private void fdupdate() {
160         if (!failed) {
161             failed = true;
162             if (failduration == 0) {
163                 if (destinationInfo.isPrivilegedSubscriber()) {
164                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
165                 } else {
166                     failduration = deliveryQueueHelper.getInitFailureTimer();
167                 }
168             }
169             resumetime = System.currentTimeMillis() + failduration;
170             long maxdur = deliveryQueueHelper.getMaxFailureTimer();
171             failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
172             if (failduration > maxdur) {
173                 failduration = maxdur;
174             }
175         }
176     }
177
178     /**
179      * Mark that a delivery task has been redirected.
180      */
181     private synchronized void markRedirect(DeliveryTask task) {
182         working.remove(task.getPublishId());
183         logger.info(task.getPublishId() + " marked as redirected.");
184         retry.put(task.getPublishId(), task);
185     }
186
187     /**
188      * Mark that a delivery task has temporarily failed.
189      */
190     private synchronized void markFailWithRetry(DeliveryTask task) {
191         working.remove(task.getPublishId());
192         logger.info(task.getPublishId() + " marked as temporarily failed.");
193         retry.put(task.getPublishId(), task);
194         fdupdate();
195     }
196
197     /**
198      * Get the next task.
199      */
200     synchronized DeliveryTask getNext() {
201         DeliveryTask ret = peekNext();
202         if (ret != null) {
203             todoindex++;
204             working.put(ret.getPublishId(), ret);
205         }
206         return (ret);
207     }
208
209     /**
210      * Peek at the next task.
211      */
212     synchronized DeliveryTask peekNext() {
213         long now = System.currentTimeMillis();
214         long mindate = now - deliveryQueueHelper.getExpirationTimer();
215         if (failed) {
216             if (now > resumetime) {
217                 failed = false;
218             } else {
219                 return (null);
220             }
221         }
222         while (true) {
223             if (todoindex >= todoList.size()) {
224                 todoindex = 0;
225                 todoList = new ArrayList<>();
226                 String[] files = dir.list();
227                 Arrays.sort(files);
228                 scanForNextTask(files);
229                 retry = new HashMap<>();
230             }
231             DeliveryTask dt = getDeliveryTask(mindate);
232             if (dt != null) {
233                 return dt;
234             }
235             return null;
236
237         }
238     }
239
240     /**
241      * Update the destination info for this delivery queue.
242      */
243     public void config(DestInfo destinationInfo) {
244         this.destinationInfo = destinationInfo;
245     }
246
247     /**
248      * Get the dest info.
249      */
250     public DestInfo getDestinationInfo() {
251         return (destinationInfo);
252     }
253
254     /**
255      * Get the config manager.
256      */
257     public DeliveryQueueHelper getConfig() {
258         return (deliveryQueueHelper);
259     }
260
261     /**
262      * Exceptional condition occurred during delivery.
263      */
264     public void reportDeliveryExtra(DeliveryTask task, long sent) {
265         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
266     }
267
268     /**
269      * Message too old to deliver.
270      */
271     void reportExpiry(DeliveryTask task) {
272         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
273                 task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
274         markExpired(task);
275     }
276
277     /**
278      * Completed a delivery attempt.
279      */
280     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
281         if (status < 300) {
282             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
283                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
284             if (destinationInfo.isPrivilegedSubscriber()) {
285                 task.setResumeTime(System.currentTimeMillis()
286                                            + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
287                 markFailWithRetry(task);
288             } else {
289                 markSuccess(task);
290             }
291         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
292             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
293                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
294             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
295                 markRedirect(task);
296             } else {
297                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(),
298                         task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
299                 markFailNoRetry(task);
300             }
301         } else if (status < 500 && status != 429) {
302             // Status 429 is the standard response for Too Many Requests and indicates
303             // that a file needs to be delivered again at a later time.
304             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
305                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
306             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
307                     task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
308             markFailNoRetry(task);
309         } else {
310             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
311                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
312             markFailWithRetry(task);
313         }
314     }
315
316     /**
317      * Delivery failed by reason of an exception.
318      */
319     public void reportException(DeliveryTask task, Exception exception) {
320         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
321                 task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
322         deliveryQueueHelper.handleUnreachable(destinationInfo);
323         markFailWithRetry(task);
324     }
325
326     /**
327      * Get the feed ID for a subscription.
328      *
329      * @param subid The subscription ID
330      * @return The feed ID
331      */
332     public String getFeedId(String subid) {
333         return (deliveryQueueHelper.getFeedId(subid));
334     }
335
336     /**
337      * Get the URL to deliver a message to given the file ID.
338      */
339     public String getDestURL(String fileid) {
340         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
341     }
342
343     /**
344      * Deliver files until there's a failure or there are no more.
345      * files to deliver
346      */
347     public void run() {
348         DeliveryTask task;
349         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
350         int filestogo = deliveryQueueHelper.getFairFileLimit();
351         while ((task = getNext()) != null) {
352             logger.info("Processing file: " + task.getPublishId());
353             task.run();
354             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
355                 break;
356             }
357         }
358     }
359
360     /**
361      * Is there no work to do for this queue right now?.
362      */
363     synchronized boolean isSkipSet() {
364         return (peekNext() == null);
365     }
366
367     /**
368      * Reset the retry timer.
369      */
370     void resetQueue() {
371         resumetime = System.currentTimeMillis();
372     }
373
374     /**
375      * Get task if in queue and mark as success.
376      */
377     boolean markTaskSuccess(String pubId) {
378         DeliveryTask task = working.get(pubId);
379         if (task != null) {
380             markSuccess(task);
381             return true;
382         }
383         task = retry.get(pubId);
384         if (task != null) {
385             retry.remove(pubId);
386             task.clean();
387             resetQueue();
388             failduration = 0;
389             return true;
390         }
391         return false;
392     }
393
394     private void scanForNextTask(String[] files) {
395         for (String fname : files) {
396             String pubId = getPubId(fname);
397             if (pubId == null) {
398                 continue;
399             }
400             DeliveryTask dt = retry.get(pubId);
401             if (dt == null) {
402                 dt = new DeliveryTask(this, pubId);
403             }
404             todoList.add(dt);
405         }
406     }
407
408     @Nullable
409     private DeliveryTask getDeliveryTask(long mindate) {
410         if (todoindex < todoList.size()) {
411             DeliveryTask dt = todoList.get(todoindex);
412             if (dt.isCleaned()) {
413                 todoindex++;
414             }
415             if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
416                 retry.put(dt.getPublishId(), dt);
417                 todoindex++;
418             }
419             if (dt.getDate() >= mindate) {
420                 return (dt);
421             }
422             todoindex++;
423             reportExpiry(dt);
424         }
425         return null;
426     }
427
428     @Nullable
429     private String getPubId(String fname) {
430         if (!fname.endsWith(".M")) {
431             return null;
432         }
433         String fname2 = fname.substring(0, fname.length() - 2);
434         long pidtime = 0;
435         int dot = fname2.indexOf('.');
436         if (dot < 1) {
437             return null;
438         }
439         try {
440             pidtime = Long.parseLong(fname2.substring(0, dot));
441         } catch (Exception e) {
442             logger.error("Exception", e);
443         }
444         if (pidtime < 1000000000000L) {
445             return null;
446         }
447         if (working.get(fname2) != null) {
448             return null;
449         }
450         return fname2;
451     }
452 }