[DMAAP-DR] Remove cadi/aaf from dr-node
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / delivery / DeliveryQueue.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node.delivery;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.HashMap;
33 import java.util.List;
34 import org.jetbrains.annotations.Nullable;
35 import org.onap.dmaap.datarouter.node.DestInfo;
36 import org.onap.dmaap.datarouter.node.log.StatusLog;
37
38 /**
39  * Mechanism for monitoring and controlling delivery of files to a destination.
40  *
41  * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single
42  * destination (a subscription or another data router node) and assigns
43  * delivery threads to try to deliver them.  It also maintains a delivery
44  * status that causes it to back off on delivery attempts after a failure.
45  *
46  * <p>If the most recent delivery result was a failure, then no more attempts
47  * will be made for a period of time.  Initially, and on the first failure
48  * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
49  * If, after this delay, additional failures occur, each failure will
50  * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
51  * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
52  * Note that this behavior applies to the delivery queue as a whole and not
53  * to individual files in the queue.  If multiple files are being
54  * delivered and one fails, the delay will be started.  If a second
55  * delivery fails while the delay was active, it will not change the delay
56  * or change the duration of any subsequent delay.
57  * If, however, it succeeds, it will cancel the delay.
58  * The queue maintains 3 collections of files to deliver: A todoList of
59  * files that will be attempted, a working set of files that are being
60  * attempted, and a retry set of files that were attempted and failed.
61  * Whenever the todoList is empty and needs to be refilled, a scan of the
62  * spool directory is made and the file names sorted.  Any files in the working set are ignored.
63  * If a DeliveryTask for the file is in the retry set, then that delivery
64  * task is placed on the todoList.  Otherwise, a new DeliveryTask for the
65  * file is created and placed on the todoList.
66  * If, when a DeliveryTask is about to be removed from the todoList, its
67  * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
68  * marked as expired.
69  *
70  * <p>A delivery queue also maintains a skip flag.  This flag is true if the
71  * failure timer is active or if no files are found in a directory scan.
72  */
73 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
74     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
75     private DeliveryQueueHelper deliveryQueueHelper;
76
77     private DestInfo destinationInfo;
78     private HashMap<String, DeliveryTask> working = new HashMap<>();
79     private HashMap<String, DeliveryTask> retry = new HashMap<>();
80     private int todoindex;
81     private boolean failed;
82     private long failduration;
83     private long resumetime;
84     private File dir;
85     private List<DeliveryTask> todoList = new ArrayList<>();
86
87     /**
88      * Create a delivery queue for a given destination info.
89      */
90     public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
91         this.deliveryQueueHelper = deliveryQueueHelper;
92         this.destinationInfo = destinationInfo;
93         dir = new File(destinationInfo.getSpool());
94         dir.mkdirs();
95     }
96
97     /**
98      * Try to cancel a delivery task.
99      *
100      * @return The length of the task in bytes or 0 if the task cannot be cancelled.
101      */
102     public synchronized long cancelTask(String pubid) {
103         if (working.get(pubid) != null) {
104             return (0);
105         }
106         DeliveryTask dt = retry.get(pubid);
107         if (dt == null) {
108             for (int i = todoindex; i < todoList.size(); i++) {
109                 DeliveryTask xdt = todoList.get(i);
110                 if (xdt.getPublishId().equals(pubid)) {
111                     dt = xdt;
112                     break;
113                 }
114             }
115         }
116         if (dt == null) {
117             dt = new DeliveryTask(this, pubid);
118             if (dt.getFileId() == null) {
119                 return (0);
120             }
121         }
122         if (dt.isCleaned()) {
123             return (0);
124         }
125         StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(),
126                 dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
127         dt.clean();
128         return (dt.getLength());
129     }
130
131     /**
132      * Mark that a delivery task has succeeded.
133      */
134     private synchronized void markSuccess(DeliveryTask task) {
135         working.remove(task.getPublishId());
136         logger.info(task.getPublishId() + " marked as success.");
137         task.clean();
138         failed = false;
139         failduration = 0;
140     }
141
142     /**
143      * Mark that a delivery task has expired.
144      */
145     private synchronized void markExpired(DeliveryTask task) {
146         logger.info(task.getPublishId() + " marked as expired.");
147         task.clean();
148     }
149
150     /**
151      * Mark that a delivery task has failed permanently.
152      */
153     private synchronized void markFailNoRetry(DeliveryTask task) {
154         working.remove(task.getPublishId());
155         logger.info(task.getPublishId() + " marked as failed permanently");
156         task.clean();
157         failed = false;
158         failduration = 0;
159     }
160
161     private void fdupdate() {
162         if (!failed) {
163             failed = true;
164             if (failduration == 0) {
165                 if (destinationInfo.isPrivilegedSubscriber()) {
166                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
167                 } else {
168                     failduration = deliveryQueueHelper.getInitFailureTimer();
169                 }
170             }
171             resumetime = System.currentTimeMillis() + failduration;
172             long maxdur = deliveryQueueHelper.getMaxFailureTimer();
173             failduration = (long) (failduration * deliveryQueueHelper.getFailureBackoff());
174             if (failduration > maxdur) {
175                 failduration = maxdur;
176             }
177         }
178     }
179
180     /**
181      * Mark that a delivery task has been redirected.
182      */
183     private synchronized void markRedirect(DeliveryTask task) {
184         working.remove(task.getPublishId());
185         logger.info(task.getPublishId() + " marked as redirected.");
186         retry.put(task.getPublishId(), task);
187     }
188
189     /**
190      * Mark that a delivery task has temporarily failed.
191      */
192     private synchronized void markFailWithRetry(DeliveryTask task) {
193         working.remove(task.getPublishId());
194         logger.info(task.getPublishId() + " marked as temporarily failed.");
195         retry.put(task.getPublishId(), task);
196         fdupdate();
197     }
198
199     /**
200      * Get the next task.
201      */
202     public synchronized DeliveryTask getNext() {
203         DeliveryTask ret = peekNext();
204         if (ret != null) {
205             todoindex++;
206             working.put(ret.getPublishId(), ret);
207         }
208         return (ret);
209     }
210
211     /**
212      * Peek at the next task.
213      */
214     public synchronized DeliveryTask peekNext() {
215         long now = System.currentTimeMillis();
216         long mindate = now - deliveryQueueHelper.getExpirationTimer();
217         if (failed) {
218             if (now > resumetime) {
219                 failed = false;
220             } else {
221                 return (null);
222             }
223         }
224         while (true) {
225             if (todoindex >= todoList.size()) {
226                 todoindex = 0;
227                 todoList = new ArrayList<>();
228                 String[] files = dir.list();
229                 if (files != null) {
230                     Arrays.sort(files);
231                     scanForNextTask(files);
232                 }
233                 retry = new HashMap<>();
234             }
235             return getDeliveryTask(mindate);
236         }
237     }
238
239     /**
240      * Update the destination info for this delivery queue.
241      */
242     public void config(DestInfo destinationInfo) {
243         this.destinationInfo = destinationInfo;
244     }
245
246     /**
247      * Get the dest info.
248      */
249     public DestInfo getDestinationInfo() {
250         return (destinationInfo);
251     }
252
253     /**
254      * Get the config manager.
255      */
256     public DeliveryQueueHelper getConfig() {
257         return (deliveryQueueHelper);
258     }
259
260     /**
261      * Exceptional condition occurred during delivery.
262      */
263     public void reportDeliveryExtra(DeliveryTask task, long sent) {
264         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
265     }
266
267     /**
268      * Message too old to deliver.
269      */
270     void reportExpiry(DeliveryTask task) {
271         StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
272                 task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
273         markExpired(task);
274     }
275
276     /**
277      * Completed a delivery attempt.
278      */
279     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
280         if (status < 300) {
281             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
282                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
283             if (destinationInfo.isPrivilegedSubscriber()) {
284                 task.setResumeTime(System.currentTimeMillis()
285                                            + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
286                 markFailWithRetry(task);
287             } else {
288                 markSuccess(task);
289             }
290         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
291             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
292                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
293             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
294                 markRedirect(task);
295             } else {
296                 StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(),
297                         task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
298                 markFailNoRetry(task);
299             }
300         } else if (status < 500 && status != 429) {
301             // Status 429 is the standard response for Too Many Requests and indicates
302             // that a file needs to be delivered again at a later time.
303             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
304                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
305             StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
306                     task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
307             markFailNoRetry(task);
308         } else {
309             StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
310                     task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
311             markFailWithRetry(task);
312         }
313     }
314
315     /**
316      * Delivery failed by reason of an exception.
317      */
318     public void reportException(DeliveryTask task, Exception exception) {
319         StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
320                 task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
321         deliveryQueueHelper.handleUnreachable(destinationInfo);
322         markFailWithRetry(task);
323     }
324
325     /**
326      * Get the feed ID for a subscription.
327      *
328      * @param subid The subscription ID
329      * @return The feed ID
330      */
331     public String getFeedId(String subid) {
332         return (deliveryQueueHelper.getFeedId(subid));
333     }
334
335     /**
336      * Get the URL to deliver a message to given the file ID.
337      */
338     public String getDestURL(String fileid) {
339         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
340     }
341
342     /**
343      * Deliver files until there's a failure or there are no more.
344      * files to deliver
345      */
346     public void run() {
347         DeliveryTask task;
348         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
349         int filestogo = deliveryQueueHelper.getFairFileLimit();
350         while ((task = getNext()) != null) {
351             logger.info("Processing file: " + task.getPublishId());
352             task.run();
353             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
354                 break;
355             }
356         }
357     }
358
359     /**
360      * Is there no work to do for this queue right now?.
361      */
362     synchronized boolean isSkipSet() {
363         return (peekNext() == null);
364     }
365
366     /**
367      * Reset the retry timer.
368      */
369     public void resetQueue() {
370         resumetime = System.currentTimeMillis();
371     }
372
373     /**
374      * Get task if in queue and mark as success.
375      */
376     public boolean markTaskSuccess(String pubId) {
377         DeliveryTask task = working.get(pubId);
378         if (task != null) {
379             markSuccess(task);
380             return true;
381         }
382         task = retry.get(pubId);
383         if (task != null) {
384             retry.remove(pubId);
385             task.clean();
386             resetQueue();
387             failduration = 0;
388             return true;
389         }
390         return false;
391     }
392
393     private void scanForNextTask(String[] files) {
394         for (String fname : files) {
395             String pubId = getPubId(fname);
396             if (pubId == null) {
397                 continue;
398             }
399             DeliveryTask dt = retry.get(pubId);
400             if (dt == null) {
401                 dt = new DeliveryTask(this, pubId);
402             }
403             todoList.add(dt);
404         }
405     }
406
407     @Nullable
408     private DeliveryTask getDeliveryTask(long mindate) {
409         if (todoindex < todoList.size()) {
410             DeliveryTask dt = todoList.get(todoindex);
411             if (dt.isCleaned()) {
412                 todoindex++;
413             }
414             if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
415                 retry.put(dt.getPublishId(), dt);
416                 todoindex++;
417             }
418             if (dt.getDate() >= mindate) {
419                 return (dt);
420             }
421             todoindex++;
422             reportExpiry(dt);
423         }
424         return null;
425     }
426
427     @Nullable
428     private String getPubId(String fname) {
429         if (!fname.endsWith(".M")) {
430             return null;
431         }
432         String fname2 = fname.substring(0, fname.length() - 2);
433         long pidtime = 0;
434         int dot = fname2.indexOf('.');
435         if (dot < 1) {
436             return null;
437         }
438         try {
439             pidtime = Long.parseLong(fname2.substring(0, dot));
440         } catch (Exception e) {
441             logger.error("Exception", e);
442         }
443         if (pidtime < 1000000000000L) {
444             return null;
445         }
446         if (working.get(fname2) != null) {
447             return null;
448         }
449         return fname2;
450     }
451 }