2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
\r
23 import org.junit.Test;
\r
24 import org.openecomp.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest;
\r
26 import java.util.List;
\r
28 import static org.junit.Assert.assertTrue;
\r
32 * @author Rajiv Singla . Creation Date: 11/2/2016.
\r
34 public class DMaaPMRPublisherQueueImplTest extends BaseAnalyticsDMaaPUnitTest {
\r
38 public void testAddBatchMessages() throws Exception {
\r
39 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
\r
40 // add two messages to batch queue
\r
41 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
42 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
\r
43 // add two more message to batch queue
\r
44 final int batchMessagesSizeAfterSecondInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
45 assertTrue("Batch Message Queue size must be 4", batchMessagesSizeAfterSecondInsert == 4);
\r
46 // Now get all messages which must drain out batch queue
\r
47 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
\r
48 assertTrue("There must be 4 messages to publish", messagesToPublish.size() == 4);
\r
49 assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
\r
53 @Test(expected = IllegalStateException.class)
\r
54 public void testAddBatchMessagesWhenQueueSizeIsFull() throws Exception {
\r
55 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(2, 20);
\r
56 // add two messages to batch queue
\r
57 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
58 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
\r
59 // add 2 more messages should now throw IllegalStateException
\r
60 publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
64 public void testAddRecoverableMessages() throws Exception {
\r
65 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
\r
66 // add two messages to batch queue
\r
67 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
68 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
\r
69 // add two recoverable messages
\r
70 final int recoverableMessageSizeAfterFirstInsert =
\r
71 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
72 assertTrue("Recovery Message Queue size must be 2 after first insert",
\r
73 recoverableMessageSizeAfterFirstInsert == 2);
\r
74 // add two more recoverable messages
\r
75 final int recoverableMessageSizeAfterSecondInsert =
\r
76 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
77 assertTrue("Recovery Message Queue size must be 4 after second insert",
\r
78 recoverableMessageSizeAfterSecondInsert == 4);
\r
79 // Now get all messages which must drain out batch queue
\r
80 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
\r
81 assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6);
\r
82 assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
\r
83 assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20);
\r
87 @Test(expected = IllegalStateException.class)
\r
88 public void testAddRecoverableMessagesWhenRecoveryQueueIsFull() throws Exception {
\r
89 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 2);
\r
90 // add two messages to batch queue
\r
91 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
92 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
\r
93 // add two recoverable messages
\r
94 final int recoverableMessageSizeAfterFirstInsert =
\r
95 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
96 assertTrue("Recovery Message Queue size must be 2 after first insert",
\r
97 recoverableMessageSizeAfterFirstInsert == 2);
\r
98 // add two more recoverable messages which should throw IllegalStateException
\r
99 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
103 public void testGetMessageForPublishing() throws Exception {
\r
104 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
\r
105 // add two messages to batch queue
\r
106 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
107 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
\r
108 // add two recoverable messages
\r
109 final int recoverableMessageSizeAfterFirstInsert =
\r
110 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
111 assertTrue("Recovery Message Queue size must be 2 after first insert",
\r
112 recoverableMessageSizeAfterFirstInsert == 2);
\r
113 // add two more recoverable messages
\r
114 final int recoverableMessageSizeAfterSecondInsert =
\r
115 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
116 assertTrue("Recovery Message Queue size must be 4 after second insert",
\r
117 recoverableMessageSizeAfterSecondInsert == 4);
\r
118 // Now get all messages which must drain out batch queue
\r
119 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
\r
120 assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6);
\r
121 // add two more batch and recovery messages
\r
122 final int batchQueueSize = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
123 final int recoveryQueueSize = publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
124 final int messagePublishCount = publisherQueue.getMessageForPublishing().size();
\r
125 assertTrue("Batch Queue + Recovery Queue message total must batch publish message count",
\r
126 messagePublishCount == (batchQueueSize + recoveryQueueSize));
\r
127 assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
\r
128 assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20);
\r
133 public void testGetBatchQueueRemainingSize() throws Exception {
\r
135 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
\r
136 // add two messages to batch queue
\r
137 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
138 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
\r
139 assertTrue("Batch remaining capacity should be reduced by 2",
\r
140 publisherQueue.getBatchQueueRemainingSize() == 8);
\r
142 // add two recoverable messages
\r
143 final int recoverableMessageSizeAfterFirstInsert =
\r
144 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
145 assertTrue("Recovery Message Queue size must be 2 after first insert",
\r
146 recoverableMessageSizeAfterFirstInsert == 2);
\r
148 // recoverable message should not change batch queue capacity
\r
149 assertTrue("Adding recoverable Message must not have any impact on batch queue remaining capacity ",
\r
150 publisherQueue.getBatchQueueRemainingSize() == 8);
\r
151 // Now get all messages which must drain out batch queue
\r
152 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
\r
153 assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4);
\r
155 // Batch queue remaining capacity should now match original batch size
\r
156 assertTrue("Batch Queue remaining capacity must match original batch queue size", publisherQueue
\r
157 .getBatchQueueRemainingSize() == 10);
\r
161 public void testGetRecoveryQueueRemainingSize() throws Exception {
\r
162 DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
\r
164 // add two recoverable messages
\r
165 final int recoverableMessageSizeAfterFirstInsert =
\r
166 publisherQueue.addRecoverableMessages(getTwoSampleMessages());
\r
167 assertTrue("Recovery Message Queue size must be 2 after first insert",
\r
168 recoverableMessageSizeAfterFirstInsert == 2);
\r
169 assertTrue("Recovery Queue remaining capacity should be reduced by 2",
\r
170 publisherQueue.getRecoveryQueueRemainingSize() == 18);
\r
172 // add two messages to batch queue
\r
173 final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
\r
174 assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
\r
176 // batch message should not change recoverable queue capacity
\r
177 assertTrue("Adding batch queue Message must not have any impact on recovery queue remaining capacity ",
\r
178 publisherQueue.getRecoveryQueueRemainingSize() == 18);
\r
180 // Now get all messages which must drain out recovery queue
\r
181 final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
\r
182 assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4);
\r
184 // Recoverable queue remaining capacity should now match original recovery queue size
\r
185 assertTrue("Recoverable Queue remaining capacity must match original batch queue size", publisherQueue
\r
186 .getRecoveryQueueRemainingSize() == 20);
\r