2  * ============LICENSE_START=======================================================
 
   3  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
 
   4  * ================================================================================
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *      http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  16  * ============LICENSE_END=========================================================
 
  19 package org.onap.policy.models.sim.dmaap.provider;
 
  21 import static org.assertj.core.api.Assertions.assertThatCode;
 
  22 import static org.junit.Assert.assertEquals;
 
  23 import static org.junit.Assert.assertNotNull;
 
  24 import static org.junit.Assert.assertNull;
 
  25 import static org.junit.Assert.assertSame;
 
  26 import static org.mockito.ArgumentMatchers.any;
 
  27 import static org.mockito.ArgumentMatchers.anyInt;
 
  28 import static org.mockito.ArgumentMatchers.anyLong;
 
  29 import static org.mockito.ArgumentMatchers.eq;
 
  30 import static org.mockito.Mockito.spy;
 
  31 import static org.mockito.Mockito.times;
 
  32 import static org.mockito.Mockito.verify;
 
  33 import static org.mockito.Mockito.when;
 
  35 import java.util.Arrays;
 
  36 import java.util.Collections;
 
  37 import java.util.List;
 
  38 import java.util.concurrent.BlockingQueue;
 
  39 import java.util.concurrent.LinkedBlockingQueue;
 
  40 import java.util.concurrent.ScheduledExecutorService;
 
  41 import java.util.concurrent.TimeUnit;
 
  42 import javax.ws.rs.core.Response;
 
  43 import javax.ws.rs.core.Response.Status;
 
  44 import org.junit.After;
 
  45 import org.junit.Before;
 
  46 import org.junit.Test;
 
  47 import org.mockito.ArgumentCaptor;
 
  48 import org.mockito.Captor;
 
  49 import org.mockito.Mock;
 
  50 import org.mockito.MockitoAnnotations;
 
  51 import org.onap.policy.common.utils.coder.CoderException;
 
  52 import org.onap.policy.common.utils.coder.StandardCoder;
 
  53 import org.onap.policy.common.utils.coder.StandardCoderObject;
 
  54 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
 
  56 public class DmaapSimProviderTest {
 
  57     private static final String EXPECTED_EXCEPTION = "expected exception";
 
  58     private static final long SWEEP_SEC = 10L;
 
  59     private static final String TOPIC1 = "topic-A";
 
  60     private static final String TOPIC2 = "topic-B";
 
  61     private static final String CONSUMER1 = "consumer-X";
 
  62     private static final String CONSUMER_ID1 = "id1";
 
  64     private MyProvider prov;
 
  67     private DmaapSimParameterGroup params;
 
  70     private ScheduledExecutorService timer;
 
  73     private TopicData data1;
 
  76     private TopicData data2;
 
  79     private ArgumentCaptor<List<Object>> listCaptor;
 
  82     private ArgumentCaptor<List<Object>> listCaptor2;
 
  89         MockitoAnnotations.initMocks(this);
 
  91         when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
 
  93         prov = new MyProvider(params);
 
  97      * Shuts down the provider, if it's running.
 
 100     public void tearDown() {
 
 101         if (prov.isAlive()) {
 
 107      * Verifies that the constructor adds all of the expected actions to the service
 
 111     public void testDmaapSimProvider() {
 
 113         verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
 
 116         verify(timer).shutdown();
 
 120     public void testProcessDmaapMessagePut_List() throws CoderException {
 
 121         prov = spy(new MyProvider(params));
 
 123         when(data1.write(any())).thenReturn(2);
 
 125         // force topics to exist
 
 126         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
 
 127         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
 
 129         List<Object> lst = Arrays.asList("hello", "world");
 
 130         Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
 
 131         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
 
 132         StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
 
 133         assertEquals("2", sco.getString("count"));
 
 135         List<Object> lst2 = Arrays.asList("helloB", "worldB");
 
 136         prov.processDmaapMessagePut(TOPIC1, lst2);
 
 137         prov.processDmaapMessagePut(TOPIC2, lst2);
 
 139         // should only invoke this once for each topic
 
 140         verify(prov).makeTopicData(TOPIC1);
 
 141         verify(prov).makeTopicData(TOPIC2);
 
 143         // should process all writes
 
 144         verify(data1).write(lst);
 
 145         verify(data1).write(lst2);
 
 147         verify(data2).write(lst2);
 
 151     public void testProcessDmaapMessagePut_Single() throws CoderException {
 
 152         prov = spy(new MyProvider(params));
 
 154         // force topics to exist
 
 155         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
 
 156         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
 
 158         final String value1 = "abc";
 
 159         Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
 
 160         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
 
 162         // ensure that the response can be decoded
 
 163         new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
 
 165         final String value2 = "def";
 
 166         prov.processDmaapMessagePut(TOPIC1, value2);
 
 167         prov.processDmaapMessagePut(TOPIC2, value2);
 
 169         // should only invoke this once for each topic
 
 170         verify(prov).makeTopicData(TOPIC1);
 
 171         verify(prov).makeTopicData(TOPIC2);
 
 173         // should process all writes as singleton lists
 
 174         verify(data1, times(2)).write(listCaptor.capture());
 
 175         assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
 
 176         assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
 
 178         verify(data2).write(listCaptor2.capture());
 
 179         assertEquals(Collections.singletonList(value2), listCaptor2.getAllValues().get(0));
 
 183     public void testProcessDmaapMessageGet() throws InterruptedException {
 
 184         List<String> msgs = Arrays.asList("400", "500");
 
 185         when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
 
 187         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
 
 188         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
 
 189         assertEquals(msgs.toString(), resp.getEntity().toString());
 
 193     public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
 
 194         when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
 
 196         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
 
 197         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
 
 198         assertEquals("[]", resp.getEntity().toString());
 
 202     public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
 
 203         BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
 
 205         // put in a background thread so it doesn't interrupt the tester thread
 
 208                 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
 
 209                 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
 
 210             } catch (InterruptedException e) {
 
 211                 Thread.currentThread().interrupt();
 
 215         Response resp = respQueue.poll(3, TimeUnit.SECONDS);
 
 218         assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
 
 219         assertEquals("[]", resp.getEntity().toString());
 
 223     public void testSweepTopicTaskRun() {
 
 225         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
 
 226         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
 
 228         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
 
 229         verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
 
 231         captor.getValue().run();
 
 232         verify(data1).removeIdleConsumers();
 
 233         verify(data2).removeIdleConsumers();
 
 236         captor.getValue().run();
 
 237         verify(data1, times(2)).removeIdleConsumers();
 
 238         verify(data2, times(2)).removeIdleConsumers();
 
 242     public void testMakeTimerPool() {
 
 243         // use a real provider so we can test the real makeTimer() method
 
 244         DmaapSimProvider prov2 = new DmaapSimProvider(params);
 
 246         assertThatCode(() -> prov2.stop()).doesNotThrowAnyException();
 
 250     public void testMakeTopicData() {
 
 251         // use a real provider so we can test the real makeTopicData() method
 
 252         DmaapSimProvider prov2 = new DmaapSimProvider(params);
 
 253         assertThatCode(() -> prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0))
 
 254                         .doesNotThrowAnyException();
 
 258     public void testGetInstance_testSetInstance() {
 
 259         DmaapSimProvider.setInstance(prov);
 
 260         assertSame(prov, DmaapSimProvider.getInstance());
 
 262         DmaapSimProvider.setInstance(null);
 
 263         assertNull(DmaapSimProvider.getInstance());
 
 267     public class MyProvider extends DmaapSimProvider {
 
 269         public MyProvider(DmaapSimParameterGroup params) {
 
 274         protected ScheduledExecutorService makeTimerPool() {
 
 279         protected TopicData makeTopicData(String topicName) {
 
 286                     throw new IllegalArgumentException("unknown topic name: " + topicName);