7d83765a366ccaabb8cc753bfe2a225e7c534613
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.tdjam;
22
23 import java.util.LinkedList;
24 import lombok.Getter;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * This class provides a way to handle synchronization, with minimal blocking. Requests
30  * are queued until {@link #start()} is invoked.
31  */
32 public class SerialWorkQueue {
33     private static Logger logger = LoggerFactory.getLogger(SerialWorkQueue.class);
34
35     // current work list
36     private LinkedList<Runnable> workQueue;
37
38     @Getter
39     private boolean running = false;
40
41     /**
42      * Constructor - no initial Runnable.
43      */
44     public SerialWorkQueue() {
45         workQueue = new LinkedList<>();
46     }
47
48     /**
49      * Constructor - initial 'Runnable' is specified.
50      *
51      * @param runnable an initial 'Runnnable' to run
52      */
53     public SerialWorkQueue(Runnable runnable) {
54         workQueue = new LinkedList<>();
55         workQueue.add(runnable);
56     }
57
58     /**
59      * Starts the queue. If the current thread is the first to start it, then the current
60      * thread will process any requests in the queue before returning.
61      */
62     public void start() {
63         Runnable item;
64
65         synchronized (this) {
66             if (running) {
67                 // already running
68                 return;
69             }
70
71             running = true;
72             item = workQueue.peekFirst();
73         }
74
75         if (item != null) {
76             processQueue(item);
77         }
78     }
79
80     /**
81      * Called to add a 'Runnable' to the work queue. If the queue was empty, the current
82      * thread is used to process the queue.
83      *
84      * @param work the Runnable to be queued, and eventually run
85      */
86     public void queueAndRun(Runnable work) {
87         synchronized (this) {
88             workQueue.add(work);
89             if (!running || workQueue.size() > 1) {
90                 // there was already work in the queue, so presumably there is
91                 // already an associated thread running
92                 return;
93             }
94             // if we reach this point, the queue was empty when this method was
95             // called, so this thread will process the queue
96         }
97
98         processQueue(work);
99     }
100
101     /**
102      * Internal method to process the work queue until it is empty. Note that entries
103      * could be added by this thread or another one while we are working.
104      *
105      * @param firstItem the first item in the queue
106      */
107     private void processQueue(Runnable firstItem) {
108         Runnable next = firstItem;
109         while (next != null) {
110             try {
111                 next.run();
112             } catch (Exception e) {
113                 logger.error("SerialWorkQueue.processQueue exception", e);
114             }
115
116             synchronized (this) {
117                 // remove the job we just ran
118                 workQueue.removeFirst();
119                 next = workQueue.peekFirst();
120             }
121         }
122     }
123 }