2  * ===============================LICENSE_START======================================
 
   4  * ================================================================================
 
   5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   8  *  you may not use this file except in compliance with the License.
 
   9  *   You may obtain a copy of the License at
 
  11  *          http://www.apache.org/licenses/LICENSE-2.0
 
  13  *  Unless required by applicable law or agreed to in writing, software
 
  14  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  *  See the License for the specific language governing permissions and
 
  17  *  limitations under the License.
 
  18  *  ============================LICENSE_END===========================================
 
  21 package org.onap.universalvesadapter.dmaap.MRPublisher;
 
  23 import com.google.inject.Inject;
 
  24 import com.google.inject.assistedinject.Assisted;
 
  25 import org.slf4j.Logger;
 
  26 import org.slf4j.LoggerFactory;
 
  28 import java.util.LinkedList;
 
  29 import java.util.List;
 
  30 import java.util.concurrent.LinkedBlockingDeque;
 
  32 import static com.google.common.collect.Iterables.concat;
 
  33 import static com.google.common.collect.Lists.newLinkedList;
 
  34 import static java.util.Collections.unmodifiableList;
 
  38  *     An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque}
 
  39  *     for batch and recovery queues
 
  43  * @author Rajiv Singla . Creation Date: 11/1/2016.
 
  45 public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue {
 
  47     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class);
 
  49     private final LinkedBlockingDeque<String> batchQueue;
 
  50     private final LinkedBlockingDeque<String> recoveryQueue;
 
  53     public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize,
 
  54                                      @Assisted("recoveryQueueSize") int recoveryQueueSize) {
 
  55         batchQueue = new LinkedBlockingDeque<>(batchQueueSize);
 
  56         recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize);
 
  57         LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}",
 
  58                 batchQueueSize, recoveryQueueSize);
 
  62     public synchronized int addBatchMessages(List<String> batchMessages) {
 
  64         // checks if batchMessages size does not exceed batch queue capacity
 
  65         if (batchMessages.size() > batchQueue.remainingCapacity()) {
 
  66             throw new IllegalStateException("Not enough capacity to add batchMessages  in batch queue");
 
  69         // Add batchMessages to batch queue
 
  70         for (String message : batchMessages) {
 
  71             batchQueue.add(message);
 
  74         // returns current elements size in batch queue
 
  75         return batchQueue.size();
 
  79     public synchronized int addRecoverableMessages(List<String> recoverableMessages) {
 
  81         // checks if messages size does not exceed recovery queue size
 
  82         if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) {
 
  83             throw new IllegalStateException("Not enough capacity to add messages in recovery queue");
 
  86         // add messages to recovery queue
 
  87         for (String recoverableMessage : recoverableMessages) {
 
  88             recoveryQueue.add(recoverableMessage);
 
  91         // returns current size of recovery queue
 
  92         return recoveryQueue.size();
 
  96     public synchronized List<String> getMessageForPublishing() {
 
  98         final List<String> recoveryMessageList = new LinkedList<>();
 
  99         final List<String> batchMessagesList = new LinkedList<>();
 
 101         // get messages from recovery queue if present
 
 102         if (!recoveryQueue.isEmpty()) {
 
 103             final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList);
 
 104             LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize);
 
 107         // get messages from batch queue if present
 
 108         if (!batchQueue.isEmpty()) {
 
 109             final int batchQueueSize = batchQueue.drainTo(batchMessagesList);
 
 110             LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize);
 
 113         // concat recovery and batch queue elements
 
 114         return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList)));
 
 118     public synchronized int getBatchQueueRemainingSize() {
 
 119         return batchQueue.remainingCapacity();
 
 123     public synchronized int getRecoveryQueueRemainingSize() {
 
 124         return recoveryQueue.remainingCapacity();