Release 1.9.2 DCAEGEN2 VESCollector container
[dcaegen2/collectors/ves.git] / src / test / java / org / onap / dcae / common / publishing / DMaaPPublishersCacheTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dcaegen2.collectors.ves
4  * ================================================================================
5  * Copyright (C) 2018 Nokia. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.common.publishing;
21
22 import static io.vavr.API.List;
23 import static io.vavr.API.Map;
24 import static org.junit.Assert.assertSame;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyZeroInteractions;
29 import static org.mockito.Mockito.when;
30
31 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
32 import io.vavr.collection.Map;
33 import io.vavr.control.Option;
34 import java.io.IOException;
35 import java.util.concurrent.TimeUnit;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.onap.dcae.common.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader;
39 import org.onap.dcae.common.publishing.DMaaPPublishersCache.OnPublisherRemovalListener;
40
41
42 public class DMaaPPublishersCacheTest {
43
44     private String streamId1;
45     private Map<String, PublisherConfig> dMaaPConfigs;
46
47     @Before
48     public void setUp() {
49         streamId1 = "sampleStream1";
50         dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1"));
51     }
52
53     @Test
54     public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() {
55         // given
56         DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
57
58         // when
59         Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1);
60         Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1);
61
62         // then
63         assertSame("should return same instance", firstPublisher.get(), secondPublisher.get());
64     }
65
66     @Test
67     public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException {
68         // given
69         CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
70         CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
71         DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
72                                                                              new OnPublisherRemovalListener(),
73                                                                              dMaaPConfigs);
74         when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1);
75
76         // when
77         dMaaPPublishersCache.getPublisher(streamId1);
78         dMaaPPublishersCache.closePublisherFor(streamId1);
79
80         // then
81         verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS);
82
83     }
84
85     @Test
86     public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() {
87         // given
88         DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
89
90         // then
91         assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty());
92     }
93
94
95     @Test
96     public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException {
97         // given
98         CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
99         CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class);
100         CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
101         String firstDomain = "domain1";
102         String secondDomain = "domain2";
103         Map<String, PublisherConfig> oldConfig = Map(firstDomain,
104                                                      new PublisherConfig(List("destination1"), "topic1"),
105                                                      secondDomain,
106                                                      new PublisherConfig(List("destination2"), "topic2",
107                                                                          "user", "pass"));
108         Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"),
109                                                      secondDomain, new PublisherConfig(List("destination2"), "topic2"));
110         DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
111                                                                              new OnPublisherRemovalListener(),
112                                                                              oldConfig);
113         when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1);
114         when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2);
115
116         dMaaPPublishersCache.getPublisher(firstDomain);
117         dMaaPPublishersCache.getPublisher(secondDomain);
118
119         // when
120         dMaaPPublishersCache.reconfigure(newConfig);
121
122         // then
123         verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS);
124         verifyZeroInteractions(cambriaPublisherMock1);
125     }
126 }