2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
23 import org.junit.Test;
24 import org.openecomp.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest;
26 import java.util.List;
28 import static org.junit.Assert.assertTrue;
32 * @author Rajiv Singla . Creation Date: 11/2/2016.
34 public class DMaaPMRPublisherQueueImplTest extends BaseAnalyticsDMaaPUnitTest {
38 public void testAddBatchMessages() throws Exception {
39 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
40 // add two messages to batch queue
41 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
42 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
43 // add two more message to batch queue
44 final int batchMessagesSizeAfterSecondInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
45 assertTrue("Batch Message Queue size must be 4", batchMessagesSizeAfterSecondInsert == 4);
46 // Now get all messages which must drain out batch queue
47 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
48 assertTrue("There must be 4 messages to publish", messagesToPublish.size() == 4);
49 assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
53 @Test(expected = IllegalStateException.class)
54 public void testAddBatchMessagesWhenQueueSizeIsFull() throws Exception {
55 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(2, 20);
56 // add two messages to batch queue
57 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
58 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
59 // add 2 more messages should now throw IllegalStateException
60 publisherQueue.addBatchMessages(getTwoSampleMessages());
64 public void testAddRecoverableMessages() throws Exception {
65 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
66 // add two messages to batch queue
67 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
68 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
69 // add two recoverable messages
70 final int recoverableMessageSizeAfterFirstInsert =
71 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
72 assertTrue("Recovery Message Queue size must be 2 after first insert",
73 recoverableMessageSizeAfterFirstInsert == 2);
74 // add two more recoverable messages
75 final int recoverableMessageSizeAfterSecondInsert =
76 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
77 assertTrue("Recovery Message Queue size must be 4 after second insert",
78 recoverableMessageSizeAfterSecondInsert == 4);
79 // Now get all messages which must drain out batch queue
80 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
81 assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6);
82 assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
83 assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20);
87 @Test(expected = IllegalStateException.class)
88 public void testAddRecoverableMessagesWhenRecoveryQueueIsFull() throws Exception {
89 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 2);
90 // add two messages to batch queue
91 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
92 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
93 // add two recoverable messages
94 final int recoverableMessageSizeAfterFirstInsert =
95 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
96 assertTrue("Recovery Message Queue size must be 2 after first insert",
97 recoverableMessageSizeAfterFirstInsert == 2);
98 // add two more recoverable messages which should throw IllegalStateException
99 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
103 public void testGetMessageForPublishing() throws Exception {
104 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
105 // add two messages to batch queue
106 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
107 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
108 // add two recoverable messages
109 final int recoverableMessageSizeAfterFirstInsert =
110 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
111 assertTrue("Recovery Message Queue size must be 2 after first insert",
112 recoverableMessageSizeAfterFirstInsert == 2);
113 // add two more recoverable messages
114 final int recoverableMessageSizeAfterSecondInsert =
115 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
116 assertTrue("Recovery Message Queue size must be 4 after second insert",
117 recoverableMessageSizeAfterSecondInsert == 4);
118 // Now get all messages which must drain out batch queue
119 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
120 assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6);
121 // add two more batch and recovery messages
122 final int batchQueueSize = publisherQueue.addBatchMessages(getTwoSampleMessages());
123 final int recoveryQueueSize = publisherQueue.addRecoverableMessages(getTwoSampleMessages());
124 final int messagePublishCount = publisherQueue.getMessageForPublishing().size();
125 assertTrue("Batch Queue + Recovery Queue message total must batch publish message count",
126 messagePublishCount == (batchQueueSize + recoveryQueueSize));
127 assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
128 assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20);
133 public void testGetBatchQueueRemainingSize() throws Exception {
135 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
136 // add two messages to batch queue
137 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
138 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
139 assertTrue("Batch remaining capacity should be reduced by 2",
140 publisherQueue.getBatchQueueRemainingSize() == 8);
142 // add two recoverable messages
143 final int recoverableMessageSizeAfterFirstInsert =
144 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
145 assertTrue("Recovery Message Queue size must be 2 after first insert",
146 recoverableMessageSizeAfterFirstInsert == 2);
148 // recoverable message should not change batch queue capacity
149 assertTrue("Adding recoverable Message must not have any impact on batch queue remaining capacity ",
150 publisherQueue.getBatchQueueRemainingSize() == 8);
151 // Now get all messages which must drain out batch queue
152 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
153 assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4);
155 // Batch queue remaining capacity should now match original batch size
156 assertTrue("Batch Queue remaining capacity must match original batch queue size", publisherQueue
157 .getBatchQueueRemainingSize() == 10);
161 public void testGetRecoveryQueueRemainingSize() throws Exception {
162 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
164 // add two recoverable messages
165 final int recoverableMessageSizeAfterFirstInsert =
166 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
167 assertTrue("Recovery Message Queue size must be 2 after first insert",
168 recoverableMessageSizeAfterFirstInsert == 2);
169 assertTrue("Recovery Queue remaining capacity should be reduced by 2",
170 publisherQueue.getRecoveryQueueRemainingSize() == 18);
172 // add two messages to batch queue
173 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
174 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
176 // batch message should not change recoverable queue capacity
177 assertTrue("Adding batch queue Message must not have any impact on recovery queue remaining capacity ",
178 publisherQueue.getRecoveryQueueRemainingSize() == 18);
180 // Now get all messages which must drain out recovery queue
181 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
182 assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4);
184 // Recoverable queue remaining capacity should now match original recovery queue size
185 assertTrue("Recoverable Queue remaining capacity must match original batch queue size", publisherQueue
186 .getRecoveryQueueRemainingSize() == 20);