2 * ============LICENSE_START=======================================================
3 * org.onap.dcaegen2.collectors.ves
4 * ================================================================================
5 * Copyright (C) 2018 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.common.publishing;
22 import static io.vavr.API.List;
23 import static io.vavr.API.Map;
24 import static org.junit.Assert.assertSame;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyZeroInteractions;
29 import static org.mockito.Mockito.when;
31 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
32 import io.vavr.collection.Map;
33 import io.vavr.control.Option;
34 import java.io.IOException;
35 import java.util.concurrent.TimeUnit;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.onap.dcae.common.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader;
39 import org.onap.dcae.common.publishing.DMaaPPublishersCache.OnPublisherRemovalListener;
42 public class DMaaPPublishersCacheTest {
44 private String streamId1;
45 private Map<String, PublisherConfig> dMaaPConfigs;
49 streamId1 = "sampleStream1";
50 dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1"));
54 public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() {
56 DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
59 Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1);
60 Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1);
63 assertSame("should return same instance", firstPublisher.get(), secondPublisher.get());
67 public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException {
69 CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
70 CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
71 DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
72 new OnPublisherRemovalListener(),
74 when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1);
77 dMaaPPublishersCache.getPublisher(streamId1);
78 dMaaPPublishersCache.closePublisherFor(streamId1);
81 verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS);
86 public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() {
88 DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
91 assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty());
96 public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException {
98 CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
99 CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class);
100 CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
101 String firstDomain = "domain1";
102 String secondDomain = "domain2";
103 Map<String, PublisherConfig> oldConfig = Map(firstDomain,
104 new PublisherConfig(List("destination1"), "topic1"),
106 new PublisherConfig(List("destination2"), "topic2",
108 Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"),
109 secondDomain, new PublisherConfig(List("destination2"), "topic2"));
110 DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
111 new OnPublisherRemovalListener(),
113 when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1);
114 when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2);
116 dMaaPPublishersCache.getPublisher(firstDomain);
117 dMaaPPublishersCache.getPublisher(secondDomain);
120 dMaaPPublishersCache.reconfigure(newConfig);
123 verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS);
124 verifyZeroInteractions(cambriaPublisherMock1);