c1484d4b203ac9a762fcab83b1b0712b3e493fac
[integration.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Simulator
4  * ================================================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.netconfsimulator.websocket;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.eq;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.spy;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.when;
30 import static org.mockito.MockitoAnnotations.initMocks;
31
32 import java.util.Map;
33 import java.util.Optional;
34 import javax.websocket.CloseReason;
35 import javax.websocket.EndpointConfig;
36 import javax.websocket.RemoteEndpoint;
37 import javax.websocket.Session;
38 import org.apache.kafka.common.Metric;
39 import org.apache.kafka.common.MetricName;
40 import org.junit.jupiter.api.BeforeEach;
41 import org.junit.jupiter.api.Test;
42 import org.mockito.Mock;
43 import org.onap.netconfsimulator.kafka.listener.KafkaListenerEntry;
44 import org.onap.netconfsimulator.kafka.listener.KafkaListenerHandler;
45 import org.onap.netconfsimulator.websocket.message.NetconfMessageListener;
46 import org.springframework.kafka.core.ConsumerFactory;
47 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
48
49 import org.springframework.kafka.listener.ContainerProperties;
50 import org.springframework.kafka.listener.GenericMessageListener;
51
52 class NetconfEndpointTest {
53
54
55     @Mock
56     private KafkaListenerHandler kafkaListenerHandler;
57
58     @Mock
59     private Session session;
60
61     @Mock
62     private EndpointConfig endpointConfig;
63
64     @Mock
65     private RemoteEndpoint.Basic remoteEndpoint;
66
67
68     @BeforeEach
69     void setUp() {
70         initMocks(this);
71     }
72
73
74     @Test
75     void shouldCreateKafkaListenerWhenClientInitializeConnection() {
76         NetconfEndpoint netconfEndpoint = new NetconfEndpoint(kafkaListenerHandler);
77         AbstractMessageListenerContainer abstractMessageListenerContainer = getListenerContainer();
78         when(session.getBasicRemote()).thenReturn(remoteEndpoint);
79         KafkaListenerEntry kafkaListenerEntry = new KafkaListenerEntry("sampleGroupId",
80             abstractMessageListenerContainer);
81         when(kafkaListenerHandler.createKafkaListener(any(NetconfMessageListener.class), eq("config")))
82             .thenReturn(kafkaListenerEntry);
83
84         netconfEndpoint.onOpen(session, endpointConfig);
85
86         assertThat(netconfEndpoint.getEntry().get().getClientId()).isEqualTo("sampleGroupId");
87         assertThat(netconfEndpoint.getEntry().get().getListenerContainer()).isEqualTo(abstractMessageListenerContainer);
88
89         verify(abstractMessageListenerContainer).start();
90     }
91
92
93     @Test
94     void shouldCloseListenerWhenClientDisconnects() {
95         NetconfEndpoint netconfEndpoint = new NetconfEndpoint(kafkaListenerHandler);
96         AbstractMessageListenerContainer abstractMessageListenerContainer = getListenerContainer();
97         netconfEndpoint.setEntry( Optional.of(new KafkaListenerEntry("sampleGroupId", abstractMessageListenerContainer)) );
98
99         netconfEndpoint.onClose(session, mock(CloseReason.class));
100
101         verify(abstractMessageListenerContainer).stop();
102     }
103
104     class TestAbstractMessageListenerContainer extends AbstractMessageListenerContainer {
105
106
107         TestAbstractMessageListenerContainer(ContainerProperties containerProperties) {
108             super(mock(ConsumerFactory.class),containerProperties);
109         }
110
111         @Override
112         protected void doStart() {
113
114         }
115
116         @Override
117         protected void doStop(Runnable callback) {
118
119         }
120
121         @Override
122         public Map<String, Map<MetricName, ? extends Metric>> metrics() {
123             return null;
124         }
125     }
126
127     private AbstractMessageListenerContainer getListenerContainer() {
128         ContainerProperties containerProperties = new ContainerProperties("config");
129         containerProperties.setGroupId("sample");
130         containerProperties.setMessageListener(mock(GenericMessageListener.class));
131         TestAbstractMessageListenerContainer testAbstractMessageListenerContainer = new TestAbstractMessageListenerContainer(
132             containerProperties);
133         return spy(testAbstractMessageListenerContainer);
134     }
135 }