2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.netconfsimulator.websocket;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.eq;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.spy;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.when;
30 import static org.mockito.MockitoAnnotations.initMocks;
33 import java.util.Optional;
34 import javax.websocket.CloseReason;
35 import javax.websocket.EndpointConfig;
36 import javax.websocket.RemoteEndpoint;
37 import javax.websocket.Session;
38 import org.apache.kafka.common.Metric;
39 import org.apache.kafka.common.MetricName;
40 import org.junit.jupiter.api.BeforeEach;
41 import org.junit.jupiter.api.Test;
42 import org.mockito.Mock;
43 import org.onap.netconfsimulator.kafka.listener.KafkaListenerEntry;
44 import org.onap.netconfsimulator.kafka.listener.KafkaListenerHandler;
45 import org.onap.netconfsimulator.websocket.message.NetconfMessageListener;
46 import org.springframework.kafka.core.ConsumerFactory;
47 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
49 import org.springframework.kafka.listener.ContainerProperties;
50 import org.springframework.kafka.listener.GenericMessageListener;
52 class NetconfEndpointTest {
56 private KafkaListenerHandler kafkaListenerHandler;
59 private Session session;
62 private EndpointConfig endpointConfig;
65 private RemoteEndpoint.Basic remoteEndpoint;
75 void shouldCreateKafkaListenerWhenClientInitializeConnection() {
76 NetconfEndpoint netconfEndpoint = new NetconfEndpoint(kafkaListenerHandler);
77 AbstractMessageListenerContainer abstractMessageListenerContainer = getListenerContainer();
78 when(session.getBasicRemote()).thenReturn(remoteEndpoint);
79 KafkaListenerEntry kafkaListenerEntry = new KafkaListenerEntry("sampleGroupId",
80 abstractMessageListenerContainer);
81 when(kafkaListenerHandler.createKafkaListener(any(NetconfMessageListener.class), eq("config")))
82 .thenReturn(kafkaListenerEntry);
84 netconfEndpoint.onOpen(session, endpointConfig);
86 assertThat(netconfEndpoint.getEntry().get().getClientId()).isEqualTo("sampleGroupId");
87 assertThat(netconfEndpoint.getEntry().get().getListenerContainer()).isEqualTo(abstractMessageListenerContainer);
89 verify(abstractMessageListenerContainer).start();
94 void shouldCloseListenerWhenClientDisconnects() {
95 NetconfEndpoint netconfEndpoint = new NetconfEndpoint(kafkaListenerHandler);
96 AbstractMessageListenerContainer abstractMessageListenerContainer = getListenerContainer();
97 netconfEndpoint.setEntry( Optional.of(new KafkaListenerEntry("sampleGroupId", abstractMessageListenerContainer)) );
99 netconfEndpoint.onClose(session, mock(CloseReason.class));
101 verify(abstractMessageListenerContainer).stop();
104 class TestAbstractMessageListenerContainer extends AbstractMessageListenerContainer {
107 TestAbstractMessageListenerContainer(ContainerProperties containerProperties) {
108 super(mock(ConsumerFactory.class),containerProperties);
112 protected void doStart() {
117 protected void doStop(Runnable callback) {
122 public Map<String, Map<MetricName, ? extends Metric>> metrics() {
127 private AbstractMessageListenerContainer getListenerContainer() {
128 ContainerProperties containerProperties = new ContainerProperties("config");
129 containerProperties.setGroupId("sample");
130 containerProperties.setMessageListener(mock(GenericMessageListener.class));
131 TestAbstractMessageListenerContainer testAbstractMessageListenerContainer = new TestAbstractMessageListenerContainer(
132 containerProperties);
133 return spy(testAbstractMessageListenerContainer);