e42b6b0dd9d947aeed5c44eba40904250aad913d
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / openecomp / dcae / apod / analytics / dmaap / service / publisher / DMaaPMRPublisherQueueImpl.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;\r
22 \r
23 import com.google.inject.Inject;\r
24 import com.google.inject.assistedinject.Assisted;\r
25 import org.slf4j.Logger;\r
26 import org.slf4j.LoggerFactory;\r
27 \r
28 import java.util.LinkedList;\r
29 import java.util.List;\r
30 import java.util.concurrent.LinkedBlockingDeque;\r
31 \r
32 import static com.google.common.collect.Iterables.concat;\r
33 import static com.google.common.collect.Lists.newLinkedList;\r
34 import static java.util.Collections.unmodifiableList;\r
35 \r
36 /**\r
37  * <p>\r
38  *     An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque}\r
39  *     for batch and recovery queues\r
40  * </p>\r
41  *\r
42  *\r
43  * @author Rajiv Singla . Creation Date: 11/1/2016.\r
44  */\r
45 public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue {\r
46 \r
47     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class);\r
48 \r
49     private final LinkedBlockingDeque<String> batchQueue;\r
50     private final LinkedBlockingDeque<String> recoveryQueue;\r
51 \r
52     @Inject\r
53     public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize,\r
54                                      @Assisted("recoveryQueueSize") int recoveryQueueSize) {\r
55         batchQueue = new LinkedBlockingDeque<>(batchQueueSize);\r
56         recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize);\r
57         LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}",\r
58                 batchQueueSize, recoveryQueueSize);\r
59     }\r
60 \r
61     @Override\r
62     public synchronized int addBatchMessages(List<String> batchMessages) {\r
63 \r
64         // checks if batchMessages size does not exceed batch queue capacity\r
65         if (batchMessages.size() > batchQueue.remainingCapacity()) {\r
66             throw new IllegalStateException("Not enough capacity to add batchMessages  in batch queue");\r
67         }\r
68 \r
69         // Add batchMessages to batch queue\r
70         for (String message : batchMessages) {\r
71             batchQueue.add(message);\r
72         }\r
73 \r
74         // returns current elements size in batch queue\r
75         return batchQueue.size();\r
76     }\r
77 \r
78     @Override\r
79     public synchronized int addRecoverableMessages(List<String> recoverableMessages) {\r
80 \r
81         // checks if messages size does not exceed recovery queue size\r
82         if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) {\r
83             throw new IllegalStateException("Not enough capacity to add messages in recovery queue");\r
84         }\r
85 \r
86         // add messages to recovery queue\r
87         for (String recoverableMessage : recoverableMessages) {\r
88             recoveryQueue.add(recoverableMessage);\r
89         }\r
90 \r
91         // returns current size of recovery queue\r
92         return recoveryQueue.size();\r
93     }\r
94 \r
95     @Override\r
96     public synchronized List<String> getMessageForPublishing() {\r
97 \r
98         final List<String> recoveryMessageList = new LinkedList<>();\r
99         final List<String> batchMessagesList = new LinkedList<>();\r
100 \r
101         // get messages from recovery queue if present\r
102         if (!recoveryQueue.isEmpty()) {\r
103             final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList);\r
104             LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize);\r
105         }\r
106 \r
107         // get messages from batch queue if present\r
108         if (!batchQueue.isEmpty()) {\r
109             final int batchQueueSize = batchQueue.drainTo(batchMessagesList);\r
110             LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize);\r
111         }\r
112 \r
113         // concat recovery and batch queue elements\r
114         return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList)));\r
115     }\r
116 \r
117     @Override\r
118     public synchronized int getBatchQueueRemainingSize() {\r
119         return batchQueue.remainingCapacity();\r
120     }\r
121 \r
122     @Override\r
123     public synchronized int getRecoveryQueueRemainingSize() {\r
124         return recoveryQueue.remainingCapacity();\r
125     }\r
126 }\r