2 * Copyright © 2019 Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import io.mockk.CapturingSlot
25 import io.mockk.verify
26 import org.apache.sshd.client.SshClient
27 import org.apache.sshd.client.channel.ChannelSubsystem
28 import org.apache.sshd.client.channel.ClientChannel
29 import org.apache.sshd.client.future.DefaultAuthFuture
30 import org.apache.sshd.client.future.DefaultConnectFuture
31 import org.apache.sshd.client.future.DefaultOpenFuture
32 import org.apache.sshd.client.session.ClientSession
33 import org.apache.sshd.common.FactoryManager
34 import org.junit.Before
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
39 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
40 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
41 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
42 import java.io.ByteArrayInputStream
43 import java.io.ByteArrayOutputStream
44 import java.io.IOException
45 import java.io.InputStream
46 import java.nio.charset.StandardCharsets
47 import java.util.concurrent.CompletableFuture
48 import java.util.concurrent.ExecutionException
49 import java.util.concurrent.TimeoutException
50 import kotlin.test.assertEquals
51 import kotlin.test.assertFailsWith
52 import kotlin.test.assertTrue
54 class NetconfSessionImplTest {
57 val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply {
58 status = RpcStatus.SUCCESS
63 val FAILED_DEVICE_RESPONSE = DeviceResponse().apply {
64 status = RpcStatus.FAILURE
69 val deviceInfo: DeviceInfo = DeviceInfo().apply {
72 ipAddress = "localhost"
76 private const val someString = "Some string"
79 private lateinit var netconfSession: NetconfSessionImpl
80 private lateinit var netconfCommunicator: NetconfDeviceCommunicator
81 private lateinit var rpcService: NetconfRpcService
82 private lateinit var mockSshClient: SshClient
83 private lateinit var mockClientSession: ClientSession
84 private lateinit var mockClientChannel: ClientChannel
85 private lateinit var mockSubsystem: ChannelSubsystem
87 private val futureMsg = "blahblahblah"
88 private val request = "0"
89 private val sessionId = "0"
90 private val messageId = "asdfasdfadf"
91 private val deviceCapabilities = setOf("capability1", "capability2")
92 private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
93 private lateinit var sampleInputStream: InputStream
94 private lateinit var sampleOutputStream: ByteArrayOutputStream
98 netconfCommunicator = mockk()
100 netconfSession = NetconfSessionImpl(deviceInfo, rpcService)
101 netconfSession.setStreamHandler(netconfCommunicator)
102 mockSshClient = mockk()
103 mockClientSession = mockk()
104 mockClientChannel = mockk()
105 mockSubsystem = mockk()
106 sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8))
107 sampleOutputStream = ByteArrayOutputStream()
111 fun `connect calls appropriate methods`() {
112 val session = spyk(netconfSession, recordPrivateCalls = true)
113 every { session["startClient"]() as Unit } just Runs
115 verify { session["startClient"]() }
118 // look for NetconfException being thrown when cannot connect
120 fun `connect throws NetconfException on error`() {
121 val errMsg = "$deviceInfo: Failed to establish SSH session"
122 assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) {
123 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
124 every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg)
125 netconfSessionSpy.connect()
130 fun `disconnect without force option for rpcService succeeds`() {
131 // rpcService.closeSession succeeds with status not RpcStatus.FAILURE
132 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
133 every { mockClientSession.close() } just Runs
134 every { mockSshClient.close() } just Runs
135 every { mockClientChannel.close() } just Runs
136 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
137 netconfSessionSpy.setSession(mockClientSession)
138 netconfSessionSpy.setClient(mockSshClient)
139 netconfSessionSpy.setChannel(mockClientChannel)
141 netconfSessionSpy.disconnect()
142 // make sure that rpcService.close session is not called again.
143 verify(exactly = 0) { rpcService.closeSession(true) }
144 verify { mockClientSession.close() }
145 verify { mockSshClient.close() }
146 verify { mockClientChannel.close() }
150 fun `disconnect with force option for rpcService succeeds`() {
151 // rpcService.closeSession succeeds with status not RpcStatus.FAILURE
152 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
153 every { rpcService.closeSession(any()) } returns
154 FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE
155 every { mockClientSession.close() } just Runs
156 every { mockSshClient.close() } just Runs
157 every { mockClientChannel.close() } just Runs
158 netconfSessionSpy.setSession(mockClientSession)
159 netconfSessionSpy.setClient(mockSshClient)
160 netconfSessionSpy.setChannel(mockClientChannel)
162 netconfSessionSpy.disconnect()
164 verify(exactly = 2) { rpcService.closeSession(any()) }
165 verify { mockClientSession.close() }
166 verify { mockSshClient.close() }
167 verify { mockClientChannel.close() }
171 fun `disconnect wraps exception from ssh closing error`() {
172 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
173 every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!")
174 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
175 every { netconfSessionSpy.checkAndReestablish() } just Runs
176 netconfSessionSpy.disconnect()
177 verify { netconfSessionSpy["close"]() }
181 fun `reconnect calls disconnect and connect`() {
182 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
183 every { netconfSessionSpy.disconnect() } just Runs
184 every { netconfSessionSpy.connect() } just Runs
185 netconfSessionSpy.reconnect()
186 verify { netconfSessionSpy.disconnect() }
187 verify { netconfSessionSpy.connect() }
191 fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() {
192 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
193 every { mockSshClient.isClosed } returns true
194 netconfSessionSpy.setClient(mockSshClient)
195 every { netconfSessionSpy["startConnection"]() as Unit } just Runs
197 netconfSessionSpy.checkAndReestablish()
199 verify { netconfSessionSpy.clearReplies() }
200 verify { netconfSessionSpy["startConnection"]() }
204 fun `checkAndReestablish restarts session and clears replies on clientSession closing`() {
205 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
206 every { mockClientSession.isClosed } returns true
207 every { mockSshClient.isClosed } returns false
208 every { netconfSessionSpy["startSession"]() as Unit } just Runs
209 netconfSessionSpy.setClient(mockSshClient)
210 netconfSessionSpy.setSession(mockClientSession)
212 netconfSessionSpy.checkAndReestablish()
214 verify { netconfSessionSpy.clearReplies() }
215 verify { netconfSessionSpy["startSession"]() }
219 fun `checkAndReestablish reopens channel and clears replies on channel closing`() {
220 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
221 every { mockClientSession.isClosed } returns false
222 every { mockSshClient.isClosed } returns false
223 every { mockClientChannel.isClosed } returns true
224 every { netconfSessionSpy["openChannel"]() as Unit } just Runs
225 netconfSessionSpy.setClient(mockSshClient)
226 netconfSessionSpy.setSession(mockClientSession)
227 netconfSessionSpy.setChannel(mockClientChannel)
229 netconfSessionSpy.checkAndReestablish()
231 verify { netconfSessionSpy.clearReplies() }
232 verify { netconfSessionSpy["openChannel"]() }
236 fun `syncRpc runs normally`() {
237 val netconfSessionSpy = spyk(netconfSession)
238 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
240 // test the case where SSH connection did not need to be re-established.
241 // put an existing item into the replies
242 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
243 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
244 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
245 every { netconfSessionSpy.checkAndReestablish() } just Runs
247 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
248 // make sure the replies didn't change
250 netconfSessionSpy.getReplies().size == 1 &&
251 netconfSessionSpy.getReplies().containsKey("somekey")
253 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
257 fun `syncRpc still succeeds and replies are cleared on client disconnect`() {
258 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
259 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
261 // put an item into the replies
262 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
264 // tests the case where SSH session needs to be re-established.
265 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
266 every { netconfSessionSpy["startClient"]() as Unit } just Runs
267 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
268 every { mockSshClient.isClosed } returns true
269 netconfSessionSpy.setClient(mockSshClient)
272 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
273 // make sure the replies got cleared out
274 assertTrue { netconfSessionSpy.getReplies().isEmpty() }
275 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
278 // Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator
280 fun `syncRpc throws NetconfException if InterruptedException is caught`() {
281 val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest"
282 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
283 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
284 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
285 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
286 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted")
287 every { netconfSessionSpy.checkAndReestablish() } just Runs
289 netconfSessionSpy.syncRpc("0", "0")
294 fun `syncRpc throws NetconfException if TimeoutException is caught`() {
295 val expectedExceptionMsg =
296 "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
297 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
298 val netconfSessionSpy = spyk(netconfSession)
299 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
300 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
301 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out")
302 every { netconfSessionSpy.checkAndReestablish() } just Runs
304 netconfSessionSpy.syncRpc("0", "0")
309 fun `syncRpc throws NetconfException if ExecutionException is caught`() {
310 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
311 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
312 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = false)
313 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
314 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
315 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
316 ExecutionException("exec exception", Exception("nested exception"))
317 every { netconfSessionSpy["close"]() as Unit } just Runs
318 every { netconfSessionSpy.checkAndReestablish() } just Runs
319 netconfSessionSpy.setSession(mockClientSession)
321 netconfSessionSpy.syncRpc("0", "0")
326 fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
327 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
328 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
329 val netconfSessionSpy = spyk(netconfSession)
330 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
331 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
332 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
333 ExecutionException("exec exception", Exception("nested exception"))
334 every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception")
335 every { netconfSessionSpy.checkAndReestablish() } just Runs
337 netconfSessionSpy.syncRpc("0", "0")
338 // make sure replies are cleared...
339 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
340 verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() }
345 fun `asyncRpc runs normally`() {
346 val netconfSessionSpy = spyk(netconfSession)
347 every { netconfSessionSpy.checkAndReestablish() } just Runs
348 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
349 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
351 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
352 every { netconfSessionSpy.checkAndReestablish() } just Runs
353 // make sure the future gets resolved
354 assertTrue { rpcResultFuture.get() == futureMsg }
355 // make sure that clearReplies wasn't called (reestablishConnection check)
356 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
360 fun `asyncRpc wraps exception`() {
361 val netconfSessionSpy = spyk(netconfSession)
362 every { netconfSessionSpy.checkAndReestablish() } just Runs
363 val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync {
364 throw Exception("blah")
366 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
368 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
369 every { netconfSessionSpy.checkAndReestablish() } just Runs
370 val e = assertFailsWith(exceptionClass = ExecutionException::class, message = futureMsg) {
371 rpcResultFuture.get()
374 assertTrue { cause is NetconfException }
378 fun `connect starts underlying client`() {
379 val propertiesMap = hashMapOf<String, Any>()
380 every { mockSshClient.start() } just Runs
381 every { mockSshClient.properties } returns propertiesMap
382 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
383 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
384 every { netconfSessionSpy["startSession"]() as Unit } just Runs
385 netconfSessionSpy.setClient(mockSshClient)
386 netconfSessionSpy.connect()
387 verify { mockSshClient.start() }
388 assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) }
389 assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) }
393 fun `startSession tries to connect to user supplied device`() {
394 every { mockSshClient.start() } just Runs
395 every { mockSshClient.properties } returns hashMapOf<String, Any>()
396 // setup slots to capture values from the invocations
397 val userSlot = CapturingSlot<String>()
398 val ipSlot = CapturingSlot<String>()
399 val portSlot = CapturingSlot<Int>()
400 // create a future that succeeded
401 val succeededFuture = DefaultConnectFuture(Any(), Any())
402 succeededFuture.value = mockClientSession
403 every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture
404 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
405 every { netconfSessionSpy["authSession"]() as Unit } just Runs
406 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
407 netconfSessionSpy.setClient(mockSshClient)
409 netconfSessionSpy.connect()
411 verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) }
412 assertEquals(deviceInfo.username, userSlot.captured)
413 assertEquals(deviceInfo.ipAddress, ipSlot.captured)
414 assertEquals(deviceInfo.port, portSlot.captured)
415 verify { netconfSessionSpy["authSession"]() }
419 fun `authSession throws exception if ClientSession is not AUTHED`() {
420 assertFailsWith(exceptionClass = NetconfException::class) {
421 // after client session connects,
422 every { mockSshClient.start() } just Runs
423 every { mockSshClient.properties } returns hashMapOf<String, Any>()
424 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
425 succeededAuthFuture.value = true // AuthFuture's value is Boolean
426 val passSlot = CapturingSlot<String>()
427 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
428 every { mockClientSession.auth() } returns succeededAuthFuture
429 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
430 succeededSessionFuture.value = mockClientSession
431 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
432 every { mockClientSession.waitFor(any(), any()) } returns
433 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED)
434 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
435 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
436 netconfSessionSpy.setClient(mockSshClient)
438 netconfSessionSpy.connect()
442 // common mock initializer for more weird tests.
443 private fun setupOpenChannelMocks() {
444 every { mockSshClient.start() } just Runs
445 every { mockSshClient.properties } returns hashMapOf<String, Any>()
446 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
447 succeededAuthFuture.value = true // AuthFuture's value is Boolean
448 val passSlot = CapturingSlot<String>()
449 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
450 every { mockClientSession.auth() } returns succeededAuthFuture
451 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
452 succeededSessionFuture.value = mockClientSession
453 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
454 every { mockClientSession.waitFor(any(), any()) } returns
456 ClientSession.ClientSessionEvent.WAIT_AUTH,
457 ClientSession.ClientSessionEvent.CLOSED,
458 ClientSession.ClientSessionEvent.AUTHED
461 every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem
462 every { mockClientChannel.invertedOut } returns sampleInputStream
463 every { mockClientChannel.invertedIn } returns sampleOutputStream
467 fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() {
468 // after client session connects, make sure the client receives authentication
469 setupOpenChannelMocks()
470 val channelFuture = DefaultOpenFuture(Any(), Any())
471 channelFuture.value = true
472 channelFuture.setOpened()
473 val connectFuture = DefaultConnectFuture(Any(), Any())
474 connectFuture.value = mockClientSession
475 connectFuture.session = mockClientSession
476 every { mockSubsystem.open() } returns channelFuture
477 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture
479 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
480 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
481 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
482 netconfSessionSpy.setClient(mockSshClient)
484 netconfSessionSpy.connect()
486 verify { mockSubsystem.open() }
490 fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() {
491 assertFailsWith(exceptionClass = NetconfException::class) {
492 // after client session connects, make sure the client receives authentication
493 setupOpenChannelMocks()
494 val channelFuture = DefaultOpenFuture(Any(), Any())
495 every { mockSubsystem.open() } returns channelFuture
496 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
497 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
498 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
499 netconfSessionSpy.setClient(mockSshClient)
501 netconfSessionSpy.connect()
503 verify { mockSubsystem.open() }
508 fun `disconnect closes session, channel, and client`() {
509 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
510 every { mockClientSession.close() } just Runs
511 every { mockClientChannel.close() } just Runs
512 every { mockSshClient.close() } just Runs
513 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
514 netconfSessionSpy.setChannel(mockClientChannel)
515 netconfSessionSpy.setClient(mockSshClient)
516 netconfSessionSpy.setSession(mockClientSession)
518 netconfSessionSpy.disconnect()
520 verify { mockClientSession.close() }
521 verify { mockClientChannel.close() }
522 verify { mockSshClient.close() }
526 fun `disconnect wraps IOException if channel doesn't close`() { // this test is equivalent to others
527 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
528 every { mockClientSession.close() } just Runs
529 every { mockClientChannel.close() } throws IOException("channel doesn't want to close!")
530 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
531 netconfSessionSpy.setChannel(mockClientChannel)
532 netconfSessionSpy.setClient(mockSshClient)
533 netconfSessionSpy.setSession(mockClientSession)
535 netconfSessionSpy.disconnect()
537 verify { mockClientSession.close() }