2 * Copyright © 2019 Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import io.mockk.CapturingSlot
25 import io.mockk.verify
26 import org.apache.sshd.client.SshClient
27 import org.apache.sshd.client.channel.ChannelSubsystem
28 import org.apache.sshd.client.channel.ClientChannel
29 import org.apache.sshd.client.future.DefaultAuthFuture
30 import org.apache.sshd.client.future.DefaultConnectFuture
31 import org.apache.sshd.client.future.DefaultOpenFuture
32 import org.apache.sshd.client.session.ClientSession
33 import org.apache.sshd.common.FactoryManager
34 import org.junit.Before
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
39 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
40 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
41 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
42 import java.io.ByteArrayInputStream
43 import java.io.ByteArrayOutputStream
44 import java.io.IOException
45 import java.io.InputStream
46 import java.nio.charset.StandardCharsets
47 import java.util.concurrent.CompletableFuture
48 import java.util.concurrent.ExecutionException
49 import java.util.concurrent.TimeoutException
50 import kotlin.test.assertEquals
51 import kotlin.test.assertFailsWith
52 import kotlin.test.assertTrue
54 class NetconfSessionImplTest {
56 val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply {
57 status = RpcStatus.SUCCESS
62 val FAILED_DEVICE_RESPONSE = DeviceResponse().apply {
63 status = RpcStatus.FAILURE
68 val deviceInfo: DeviceInfo = DeviceInfo().apply {
71 ipAddress = "localhost"
75 private const val someString = "Some string"
78 private lateinit var netconfSession: NetconfSessionImpl
79 private lateinit var netconfCommunicator: NetconfDeviceCommunicator
80 private lateinit var rpcService: NetconfRpcService
81 private lateinit var mockSshClient: SshClient
82 private lateinit var mockClientSession: ClientSession
83 private lateinit var mockClientChannel: ClientChannel
84 private lateinit var mockSubsystem: ChannelSubsystem
86 private val futureMsg = "blahblahblah"
87 private val request = "0"
88 private val sessionId = "0"
89 private val messageId = "asdfasdfadf"
90 private val deviceCapabilities = setOf("capability1", "capability2")
91 private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
92 private lateinit var sampleInputStream: InputStream
93 private lateinit var sampleOutputStream: ByteArrayOutputStream
97 netconfCommunicator = mockk()
99 netconfSession = NetconfSessionImpl(deviceInfo, rpcService)
100 netconfSession.setStreamHandler(netconfCommunicator)
101 mockSshClient = mockk()
102 mockClientSession = mockk()
103 mockClientChannel = mockk()
104 mockSubsystem = mockk()
105 sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8))
106 sampleOutputStream = ByteArrayOutputStream()
110 fun `connect calls appropriate methods`() {
111 val session = spyk(netconfSession, recordPrivateCalls = true)
112 every { session["startClient"]() as Unit } just Runs
114 verify { session["startClient"]() }
117 // look for NetconfException being thrown when cannot connect
119 fun `connect throws NetconfException on error`() {
120 val errMsg = "$deviceInfo: Failed to establish SSH session"
121 assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) {
122 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
123 every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg)
124 netconfSessionSpy.connect()
129 fun `disconnect without force option for rpcService succeeds`() {
130 // rpcService.closeSession succeeds with status not RpcStatus.FAILURE
131 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
132 every { mockClientSession.close() } just Runs
133 every { mockSshClient.close() } just Runs
134 every { mockClientChannel.close() } just Runs
135 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
136 netconfSessionSpy.setSession(mockClientSession)
137 netconfSessionSpy.setClient(mockSshClient)
138 netconfSessionSpy.setChannel(mockClientChannel)
140 netconfSessionSpy.disconnect()
141 // make sure that rpcService.close session is not called again.
142 verify(exactly = 0) { rpcService.closeSession(true) }
143 verify { mockClientSession.close() }
144 verify { mockSshClient.close() }
145 verify { mockClientChannel.close() }
149 fun `disconnect with force option for rpcService succeeds`() {
150 // rpcService.closeSession succeeds with status not RpcStatus.FAILURE
151 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
152 every { rpcService.closeSession(any()) } returns
153 FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE
154 every { mockClientSession.close() } just Runs
155 every { mockSshClient.close() } just Runs
156 every { mockClientChannel.close() } just Runs
157 netconfSessionSpy.setSession(mockClientSession)
158 netconfSessionSpy.setClient(mockSshClient)
159 netconfSessionSpy.setChannel(mockClientChannel)
161 netconfSessionSpy.disconnect()
163 verify(exactly = 2) { rpcService.closeSession(any()) }
164 verify { mockClientSession.close() }
165 verify { mockSshClient.close() }
166 verify { mockClientChannel.close() }
170 fun `disconnect wraps exception from ssh closing error`() {
171 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
172 every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!")
173 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
174 every { netconfSessionSpy.checkAndReestablish() } just Runs
175 netconfSessionSpy.disconnect()
176 verify { netconfSessionSpy["close"]() }
180 fun `reconnect calls disconnect and connect`() {
181 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
182 every { netconfSessionSpy.disconnect() } just Runs
183 every { netconfSessionSpy.connect() } just Runs
184 netconfSessionSpy.reconnect()
185 verify { netconfSessionSpy.disconnect() }
186 verify { netconfSessionSpy.connect() }
190 fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() {
191 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
192 every { mockSshClient.isClosed } returns true
193 netconfSessionSpy.setClient(mockSshClient)
194 every { netconfSessionSpy["startConnection"]() as Unit } just Runs
196 netconfSessionSpy.checkAndReestablish()
198 verify { netconfSessionSpy.clearReplies() }
199 verify { netconfSessionSpy["startConnection"]() }
203 fun `checkAndReestablish restarts session and clears replies on clientSession closing`() {
204 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
205 every { mockClientSession.isClosed } returns true
206 every { mockSshClient.isClosed } returns false
207 every { netconfSessionSpy["startSession"]() as Unit } just Runs
208 netconfSessionSpy.setClient(mockSshClient)
209 netconfSessionSpy.setSession(mockClientSession)
211 netconfSessionSpy.checkAndReestablish()
213 verify { netconfSessionSpy.clearReplies() }
214 verify { netconfSessionSpy["startSession"]() }
218 fun `checkAndReestablish reopens channel and clears replies on channel closing`() {
219 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
220 every { mockClientSession.isClosed } returns false
221 every { mockSshClient.isClosed } returns false
222 every { mockClientChannel.isClosed } returns true
223 every { netconfSessionSpy["openChannel"]() as Unit } just Runs
224 netconfSessionSpy.setClient(mockSshClient)
225 netconfSessionSpy.setSession(mockClientSession)
226 netconfSessionSpy.setChannel(mockClientChannel)
228 netconfSessionSpy.checkAndReestablish()
230 verify { netconfSessionSpy.clearReplies() }
231 verify { netconfSessionSpy["openChannel"]() }
235 fun `syncRpc runs normally`() {
236 val netconfSessionSpy = spyk(netconfSession)
237 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
239 // test the case where SSH connection did not need to be re-established.
240 // put an existing item into the replies
241 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
242 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
243 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
244 every { netconfSessionSpy.checkAndReestablish() } just Runs
246 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
247 // make sure the replies didn't change
249 netconfSessionSpy.getReplies().size == 1 &&
250 netconfSessionSpy.getReplies().containsKey("somekey")
252 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
256 fun `syncRpc still succeeds and replies are cleared on client disconnect`() {
257 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
258 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
260 // put an item into the replies
261 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
263 // tests the case where SSH session needs to be re-established.
264 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
265 every { netconfSessionSpy["startClient"]() as Unit } just Runs
266 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
267 every { mockSshClient.isClosed } returns true
268 netconfSessionSpy.setClient(mockSshClient)
271 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
272 // make sure the replies got cleared out
273 assertTrue { netconfSessionSpy.getReplies().isEmpty() }
274 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
277 // Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator
279 fun `syncRpc throws NetconfException if InterruptedException is caught`() {
280 val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest"
281 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
282 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
283 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
284 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
285 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted")
286 every { netconfSessionSpy.checkAndReestablish() } just Runs
288 netconfSessionSpy.syncRpc("0", "0")
293 fun `syncRpc throws NetconfException if TimeoutException is caught`() {
294 val expectedExceptionMsg =
295 "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
296 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
297 val netconfSessionSpy = spyk(netconfSession)
298 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
299 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
300 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out")
301 every { netconfSessionSpy.checkAndReestablish() } just Runs
303 netconfSessionSpy.syncRpc("0", "0")
308 fun `syncRpc throws NetconfException if ExecutionException is caught`() {
309 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
310 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
311 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = false)
312 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
313 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
314 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
315 ExecutionException("exec exception", Exception("nested exception"))
316 every { netconfSessionSpy["close"]() as Unit } just Runs
317 every { netconfSessionSpy.checkAndReestablish() } just Runs
318 netconfSessionSpy.setSession(mockClientSession)
320 netconfSessionSpy.syncRpc("0", "0")
325 fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
326 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
327 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
328 val netconfSessionSpy = spyk(netconfSession)
329 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
330 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
331 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
332 ExecutionException("exec exception", Exception("nested exception"))
333 every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception")
334 every { netconfSessionSpy.checkAndReestablish() } just Runs
336 netconfSessionSpy.syncRpc("0", "0")
337 // make sure replies are cleared...
338 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
339 verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() }
344 fun `asyncRpc runs normally`() {
345 val netconfSessionSpy = spyk(netconfSession)
346 every { netconfSessionSpy.checkAndReestablish() } just Runs
347 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
348 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
350 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
351 every { netconfSessionSpy.checkAndReestablish() } just Runs
352 // make sure the future gets resolved
353 assertTrue { rpcResultFuture.get() == futureMsg }
354 // make sure that clearReplies wasn't called (reestablishConnection check)
355 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
359 fun `asyncRpc wraps exception`() {
360 val netconfSessionSpy = spyk(netconfSession)
361 every { netconfSessionSpy.checkAndReestablish() } just Runs
362 val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync {
363 throw Exception("blah")
365 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
367 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
368 every { netconfSessionSpy.checkAndReestablish() } just Runs
369 val e = assertFailsWith(exceptionClass = ExecutionException::class, message = futureMsg) {
370 rpcResultFuture.get()
373 assertTrue { cause is NetconfException }
377 fun `connect starts underlying client`() {
378 val propertiesMap = hashMapOf<String, Any>()
379 every { mockSshClient.start() } just Runs
380 every { mockSshClient.properties } returns propertiesMap
381 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
382 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
383 every { netconfSessionSpy["startSession"]() as Unit } just Runs
384 netconfSessionSpy.setClient(mockSshClient)
385 netconfSessionSpy.connect()
386 verify { mockSshClient.start() }
387 assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) }
388 assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) }
392 fun `startSession tries to connect to user supplied device`() {
393 every { mockSshClient.start() } just Runs
394 every { mockSshClient.properties } returns hashMapOf<String, Any>()
395 // setup slots to capture values from the invocations
396 val userSlot = CapturingSlot<String>()
397 val ipSlot = CapturingSlot<String>()
398 val portSlot = CapturingSlot<Int>()
399 // create a future that succeeded
400 val succeededFuture = DefaultConnectFuture(Any(), Any())
401 succeededFuture.value = mockClientSession
402 every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture
403 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
404 every { netconfSessionSpy["authSession"]() as Unit } just Runs
405 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
406 netconfSessionSpy.setClient(mockSshClient)
408 netconfSessionSpy.connect()
410 verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) }
411 assertEquals(deviceInfo.username, userSlot.captured)
412 assertEquals(deviceInfo.ipAddress, ipSlot.captured)
413 assertEquals(deviceInfo.port, portSlot.captured)
414 verify { netconfSessionSpy["authSession"]() }
418 fun `authSession throws exception if ClientSession is not AUTHED`() {
419 assertFailsWith(exceptionClass = NetconfException::class) {
420 // after client session connects,
421 every { mockSshClient.start() } just Runs
422 every { mockSshClient.properties } returns hashMapOf<String, Any>()
423 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
424 succeededAuthFuture.value = true // AuthFuture's value is Boolean
425 val passSlot = CapturingSlot<String>()
426 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
427 every { mockClientSession.auth() } returns succeededAuthFuture
428 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
429 succeededSessionFuture.value = mockClientSession
430 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
431 every { mockClientSession.waitFor(any(), any()) } returns
432 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED)
433 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
434 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
435 netconfSessionSpy.setClient(mockSshClient)
437 netconfSessionSpy.connect()
441 // common mock initializer for more weird tests.
442 private fun setupOpenChannelMocks() {
443 every { mockSshClient.start() } just Runs
444 every { mockSshClient.properties } returns hashMapOf<String, Any>()
445 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
446 succeededAuthFuture.value = true // AuthFuture's value is Boolean
447 val passSlot = CapturingSlot<String>()
448 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
449 every { mockClientSession.auth() } returns succeededAuthFuture
450 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
451 succeededSessionFuture.value = mockClientSession
452 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
453 every { mockClientSession.waitFor(any(), any()) } returns
455 ClientSession.ClientSessionEvent.WAIT_AUTH,
456 ClientSession.ClientSessionEvent.CLOSED,
457 ClientSession.ClientSessionEvent.AUTHED
460 every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem
461 every { mockClientChannel.invertedOut } returns sampleInputStream
462 every { mockClientChannel.invertedIn } returns sampleOutputStream
466 fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() {
467 // after client session connects, make sure the client receives authentication
468 setupOpenChannelMocks()
469 val channelFuture = DefaultOpenFuture(Any(), Any())
470 channelFuture.value = true
471 channelFuture.setOpened()
472 val connectFuture = DefaultConnectFuture(Any(), Any())
473 connectFuture.value = mockClientSession
474 connectFuture.session = mockClientSession
475 every { mockSubsystem.open() } returns channelFuture
476 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture
478 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
479 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
480 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
481 netconfSessionSpy.setClient(mockSshClient)
483 netconfSessionSpy.connect()
485 verify { mockSubsystem.open() }
489 fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() {
490 assertFailsWith(exceptionClass = NetconfException::class) {
491 // after client session connects, make sure the client receives authentication
492 setupOpenChannelMocks()
493 val channelFuture = DefaultOpenFuture(Any(), Any())
494 every { mockSubsystem.open() } returns channelFuture
495 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
496 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
497 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
498 netconfSessionSpy.setClient(mockSshClient)
500 netconfSessionSpy.connect()
502 verify { mockSubsystem.open() }
507 fun `disconnect closes session, channel, and client`() {
508 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
509 every { mockClientSession.close() } just Runs
510 every { mockClientChannel.close() } just Runs
511 every { mockSshClient.close() } just Runs
512 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
513 netconfSessionSpy.setChannel(mockClientChannel)
514 netconfSessionSpy.setClient(mockSshClient)
515 netconfSessionSpy.setSession(mockClientSession)
517 netconfSessionSpy.disconnect()
519 verify { mockClientSession.close() }
520 verify { mockClientChannel.close() }
521 verify { mockSshClient.close() }
525 fun `disconnect wraps IOException if channel doesn't close`() { // this test is equivalent to others
526 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
527 every { mockClientSession.close() } just Runs
528 every { mockClientChannel.close() } throws IOException("channel doesn't want to close!")
529 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
530 netconfSessionSpy.setChannel(mockClientChannel)
531 netconfSessionSpy.setClient(mockSshClient)
532 netconfSessionSpy.setSession(mockClientSession)
534 netconfSessionSpy.disconnect()
536 verify { mockClientSession.close() }