2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
 
  23 import static org.assertj.core.api.Assertions.assertThat;
 
  24 import static org.assertj.core.api.Assertions.assertThatCode;
 
  25 import static org.junit.Assert.assertEquals;
 
  26 import static org.junit.Assert.assertFalse;
 
  27 import static org.junit.Assert.assertNotNull;
 
  28 import static org.junit.Assert.fail;
 
  29 import static org.mockito.Mockito.mock;
 
  30 import static org.mockito.Mockito.when;
 
  32 import com.att.nsa.cambria.client.CambriaConsumer;
 
  33 import java.io.IOException;
 
  34 import java.util.Arrays;
 
  35 import java.util.Collections;
 
  36 import java.util.List;
 
  37 import java.util.Properties;
 
  38 import java.util.concurrent.CountDownLatch;
 
  39 import org.apache.commons.collections4.IteratorUtils;
 
  40 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
  41 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
  42 import org.junit.Before;
 
  43 import org.junit.Test;
 
  44 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
 
  45 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 
  46 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
 
  47 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
 
  48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
 
  49 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
 
  50 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
 
  51 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
 
  52 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
 
  53 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 
  54 import org.powermock.reflect.Whitebox;
 
  56 public class BusConsumerTest extends TopicTestBase {
 
  58     private static final int SHORT_TIMEOUT_MILLIS = 10;
 
  59     private static final int LONG_TIMEOUT_MILLIS = 3000;
 
  68     public void testFetchingBusConsumer() throws InterruptedException {
 
  69         // should not be negative
 
  70         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
 
  71         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
 
  74         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
 
  75         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
 
  77         // should not be too large
 
  78         cons = new FetchingBusConsumerImpl(
 
  79                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
 
  80         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
 
  82         // should not be what was specified
 
  83         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
 
  84         assertThat(cons.getSleepTime()).isEqualTo(100);
 
  88     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
 
  90         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
 
  92             private CountDownLatch started = new CountDownLatch(1);
 
  95             protected void sleepAfterFetchFailure() {
 
  97                 super.sleepAfterFetchFailure();
 
 102         long tstart = System.currentTimeMillis();
 
 103         cons.sleepAfterFetchFailure();
 
 104         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
 
 106         // close while sleeping - sleep should halt prematurely
 
 107         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
 
 108         cons.started = new CountDownLatch(1);
 
 109         Thread thread = new Thread(cons::sleepAfterFetchFailure);
 
 110         tstart = System.currentTimeMillis();
 
 112         cons.started.await();
 
 115         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
 
 117         // interrupt while sleeping - sleep should halt prematurely
 
 118         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
 
 119         cons.started = new CountDownLatch(1);
 
 120         thread = new Thread(cons::sleepAfterFetchFailure);
 
 121         tstart = System.currentTimeMillis();
 
 123         cons.started.await();
 
 126         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
 
 130     public void testCambriaConsumerWrapper() {
 
 131         // verify that different wrappers can be built
 
 132         new CambriaConsumerWrapper(makeBuilder().build());
 
 133         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
 
 134         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
 
 135         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
 
 136         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
 
 137         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
 
 138         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
 
 139         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
 
 140         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
 
 141         new CambriaConsumerWrapper(makeBuilder().password(null).build());
 
 143         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
 
 144                         .doesNotThrowAnyException();
 
 148     public void testCambriaConsumerWrapperFetch() throws Exception {
 
 149         CambriaConsumer inner = mock(CambriaConsumer.class);
 
 150         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
 
 151         when(inner.fetch()).thenReturn(lst);
 
 153         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
 
 154         Whitebox.setInternalState(cons, "consumer", inner);
 
 156         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
 
 158         // arrange to throw exception next time fetch is called
 
 159         IOException ex = new IOException(EXPECTED);
 
 160         when(inner.fetch()).thenThrow(ex);
 
 162         cons.fetchTimeout = 10;
 
 166             fail("missing exception");
 
 168         } catch (IOException e) {
 
 174     public void testCambriaConsumerWrapperClose() {
 
 175         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
 
 176         assertThatCode(() -> cons.close()).doesNotThrowAnyException();
 
 180     public void testCambriaConsumerWrapperToString() {
 
 181         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
 
 185     public void testDmaapConsumerWrapper() throws Exception {
 
 186         // verify that different wrappers can be built
 
 187         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
 
 190     @Test(expected = IllegalArgumentException.class)
 
 191     public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
 
 192         new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
 
 196     public void testDmaapConsumerWrapperFetch() throws Exception {
 
 197         DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
 
 198         MRConsumerImpl cons = mock(MRConsumerImpl.class);
 
 200         dmaap.fetchTimeout = 5;
 
 201         dmaap.consumer = cons;
 
 204         when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
 
 205         assertFalse(dmaap.fetch().iterator().hasNext());
 
 207         // with messages, 200
 
 208         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
 
 209         MRConsumerResponse resp = new MRConsumerResponse();
 
 210         resp.setResponseCode("200");
 
 211         resp.setActualMessages(lst);
 
 212         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
 
 214         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
 
 217         resp.setActualMessages(null);
 
 218         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
 
 220         assertFalse(dmaap.fetch().iterator().hasNext());
 
 222         // with messages, NOT 200
 
 223         resp.setResponseCode("400");
 
 224         resp.setActualMessages(lst);
 
 225         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
 
 227         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
 
 231     public void testDmaapConsumerWrapperClose() throws Exception {
 
 232         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
 
 236     public void testDmaapConsumerWrapperToString() throws Exception {
 
 237         assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
 
 241     public void testDmaapAafConsumerWrapper() throws Exception {
 
 242         // verify that different wrappers can be built
 
 243         new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
 
 244         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
 
 245                         .doesNotThrowAnyException();
 
 248     @Test(expected = IllegalArgumentException.class)
 
 249     public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
 
 251          * Unfortunately, the MR code intercepts this and throws an exception before the
 
 252          * wrapper gets a chance to check it, thus this test does not improve the coverage
 
 253          * for the constructor.
 
 255         new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
 
 259     public void testDmaapAafConsumerWrapperToString() throws Exception {
 
 260         assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
 
 264     public void testDmaapDmeConsumerWrapper() throws Exception {
 
 265         // verify that different wrappers can be built
 
 266         new DmaapDmeConsumerWrapper(makeBuilder().build());
 
 267         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
 
 268         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
 
 269         new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
 
 271         addProps.put(ROUTE_PROP, MY_ROUTE);
 
 272         new DmaapDmeConsumerWrapper(makeBuilder().build());
 
 273         assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
 
 274                         .doesNotThrowAnyException();
 
 277     @Test(expected = IllegalArgumentException.class)
 
 278     public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
 
 279         new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
 
 282     @Test(expected = IllegalArgumentException.class)
 
 283     public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
 
 284         new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
 
 287     @Test(expected = IllegalArgumentException.class)
 
 288     public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
 
 289         new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
 
 292     @Test(expected = IllegalArgumentException.class)
 
 293     public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
 
 294         new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
 
 297     @Test(expected = IllegalArgumentException.class)
 
 298     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
 
 299         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
 
 303     public void testKafkaConsumerWrapper() throws Exception {
 
 304         // verify that different wrappers can be built
 
 305         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
 
 308     @Test(expected = IllegalArgumentException.class)
 
 309     public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
 
 310         new KafkaConsumerWrapper(makeBuilder().topic(null).build());
 
 313     @Test(expected = java.lang.IllegalStateException.class)
 
 314     public void testKafkaConsumerWrapperFetch() throws Exception {
 
 316         //Setup Properties for consumer
 
 317         Properties kafkaProps = new Properties();
 
 318         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 
 319         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
 
 320         kafkaProps.setProperty("enable.auto.commit", "true");
 
 321         kafkaProps.setProperty("auto.commit.interval.ms", "1000");
 
 322         kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
 323             "org.apache.kafka.common.serialization.StringDeserializer");
 
 324         kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
 325             "org.apache.kafka.common.serialization.StringDeserializer");
 
 326         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
 327         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
 329         KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
 
 330         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
 
 331         kafka.consumer = consumer;
 
 333         assertFalse(kafka.fetch().iterator().hasNext());
 
 338     public void testKafkaConsumerWrapperClose() throws Exception {
 
 339         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
 
 343     public void testKafkaConsumerWrapperToString() throws Exception {
 
 344         assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
 
 347     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
 
 349         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
 
 350             super(busTopicParams);
 
 354         public Iterable<String> fetch() throws IOException {