2 * Copyright © 2019 Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
20 import org.apache.sshd.client.SshClient
21 import org.apache.sshd.client.channel.ChannelSubsystem
22 import org.apache.sshd.client.channel.ClientChannel
23 import org.apache.sshd.client.future.DefaultAuthFuture
24 import org.apache.sshd.client.future.DefaultConnectFuture
25 import org.apache.sshd.client.future.DefaultOpenFuture
26 import org.apache.sshd.client.session.ClientSession
27 import org.apache.sshd.common.FactoryManager
28 import org.junit.Before
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
36 import java.io.ByteArrayInputStream
37 import java.io.ByteArrayOutputStream
38 import java.io.IOException
39 import java.io.InputStream
40 import java.nio.charset.*
41 import java.util.concurrent.*
42 import kotlin.test.assertEquals
43 import kotlin.test.assertFailsWith
44 import kotlin.test.assertTrue
46 class NetconfSessionImplTest {
48 val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply {
49 status = RpcStatus.SUCCESS
54 val FAILED_DEVICE_RESPONSE = DeviceResponse().apply {
55 status = RpcStatus.FAILURE
60 val deviceInfo: DeviceInfo = DeviceInfo().apply {
63 ipAddress = "localhost"
67 private const val someString = "Some string"
70 private lateinit var netconfSession: NetconfSessionImpl
71 private lateinit var netconfCommunicator: NetconfDeviceCommunicator
72 private lateinit var rpcService: NetconfRpcService
73 private lateinit var mockSshClient: SshClient
74 private lateinit var mockClientSession: ClientSession
75 private lateinit var mockClientChannel: ClientChannel
76 private lateinit var mockSubsystem: ChannelSubsystem
78 private val futureMsg = "blahblahblah"
79 private val request = "0"
80 private val sessionId = "0"
81 private val messageId = "asdfasdfadf"
82 private val deviceCapabilities = setOf("capability1", "capability2")
83 private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
84 private lateinit var sampleInputStream: InputStream
85 private lateinit var sampleOutputStream: ByteArrayOutputStream
89 netconfCommunicator = mockk()
91 netconfSession = NetconfSessionImpl(deviceInfo, rpcService)
92 netconfSession.setStreamHandler(netconfCommunicator)
93 mockSshClient = mockk()
94 mockClientSession = mockk()
95 mockClientChannel = mockk()
96 mockSubsystem = mockk()
97 sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8))
98 sampleOutputStream = ByteArrayOutputStream()
102 fun `connect calls appropriate methods`() {
103 val session = spyk(netconfSession, recordPrivateCalls = true)
104 every { session["startClient"]() as Unit } just Runs
106 verify { session["startClient"]() }
109 //look for NetconfException being thrown when cannot connect
111 fun `connect throws NetconfException on error`() {
112 val errMsg = "$deviceInfo: Failed to establish SSH session"
113 assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) {
114 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
115 every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg)
116 netconfSessionSpy.connect()
121 fun `disconnect without force option for rpcService succeeds`() {
122 //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
123 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
124 every { mockClientSession.close() } just Runs
125 every { mockSshClient.close() } just Runs
126 every { mockClientChannel.close() } just Runs
127 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
128 netconfSessionSpy.setSession(mockClientSession)
129 netconfSessionSpy.setClient(mockSshClient)
130 netconfSessionSpy.setChannel(mockClientChannel)
132 netconfSessionSpy.disconnect()
133 //make sure that rpcService.close session is not called again.
134 verify(exactly = 0) { rpcService.closeSession(true) }
135 verify { mockClientSession.close() }
136 verify { mockSshClient.close() }
137 verify { mockClientChannel.close() }
141 fun `disconnect with force option for rpcService succeeds`() {
142 //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
143 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
144 every { rpcService.closeSession(any()) } returns
145 FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE
146 every { mockClientSession.close() } just Runs
147 every { mockSshClient.close() } just Runs
148 every { mockClientChannel.close() } just Runs
149 netconfSessionSpy.setSession(mockClientSession)
150 netconfSessionSpy.setClient(mockSshClient)
151 netconfSessionSpy.setChannel(mockClientChannel)
153 netconfSessionSpy.disconnect()
155 verify(exactly = 2) { rpcService.closeSession(any()) }
156 verify { mockClientSession.close() }
157 verify { mockSshClient.close() }
158 verify { mockClientChannel.close() }
163 fun `disconnect wraps exception from ssh closing error`() {
164 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
165 every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!")
166 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
167 every { netconfSessionSpy.checkAndReestablish() } just Runs
168 netconfSessionSpy.disconnect()
169 verify { netconfSessionSpy["close"]() }
173 fun `reconnect calls disconnect and connect`() {
174 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
175 every { netconfSessionSpy.disconnect() } just Runs
176 every { netconfSessionSpy.connect() } just Runs
177 netconfSessionSpy.reconnect()
178 verify { netconfSessionSpy.disconnect() }
179 verify { netconfSessionSpy.connect() }
183 fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() {
184 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
185 every { mockSshClient.isClosed } returns true
186 netconfSessionSpy.setClient(mockSshClient)
187 every { netconfSessionSpy["startConnection"]() as Unit } just Runs
189 netconfSessionSpy.checkAndReestablish()
191 verify { netconfSessionSpy.clearReplies() }
192 verify { netconfSessionSpy["startConnection"]() }
196 fun `checkAndReestablish restarts session and clears replies on clientSession closing`() {
197 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
198 every { mockClientSession.isClosed } returns true
199 every { mockSshClient.isClosed } returns false
200 every { netconfSessionSpy["startSession"]() as Unit } just Runs
201 netconfSessionSpy.setClient(mockSshClient)
202 netconfSessionSpy.setSession(mockClientSession)
204 netconfSessionSpy.checkAndReestablish()
206 verify { netconfSessionSpy.clearReplies() }
207 verify { netconfSessionSpy["startSession"]() }
211 fun `checkAndReestablish reopens channel and clears replies on channel closing`() {
212 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
213 every { mockClientSession.isClosed } returns false
214 every { mockSshClient.isClosed } returns false
215 every { mockClientChannel.isClosed } returns true
216 every { netconfSessionSpy["openChannel"]() as Unit } just Runs
217 netconfSessionSpy.setClient(mockSshClient)
218 netconfSessionSpy.setSession(mockClientSession)
219 netconfSessionSpy.setChannel(mockClientChannel)
221 netconfSessionSpy.checkAndReestablish()
223 verify { netconfSessionSpy.clearReplies() }
224 verify { netconfSessionSpy["openChannel"]() }
229 fun `syncRpc runs normally`() {
230 val netconfSessionSpy = spyk(netconfSession)
231 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
233 //test the case where SSH connection did not need to be re-established.
234 //put an existing item into the replies
235 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
236 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
237 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
238 every { netconfSessionSpy.checkAndReestablish() } just Runs
240 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
241 //make sure the replies didn't change
243 netconfSessionSpy.getReplies().size == 1 &&
244 netconfSessionSpy.getReplies().containsKey("somekey")
246 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
251 fun `syncRpc still succeeds and replies are cleared on client disconnect`() {
252 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
253 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
255 //put an item into the replies
256 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
258 //tests the case where SSH session needs to be re-established.
259 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
260 every { netconfSessionSpy["startClient"]() as Unit } just Runs
261 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
262 every { mockSshClient.isClosed } returns true
263 netconfSessionSpy.setClient(mockSshClient)
266 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
267 //make sure the replies got cleared out
268 assertTrue { netconfSessionSpy.getReplies().isEmpty() }
269 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
272 //Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator
274 fun `syncRpc throws NetconfException if InterruptedException is caught`() {
275 val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest"
276 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
277 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
278 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
279 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
280 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted")
281 every { netconfSessionSpy.checkAndReestablish() } just Runs
283 netconfSessionSpy.syncRpc("0", "0")
288 fun `syncRpc throws NetconfException if TimeoutException is caught`() {
289 val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
290 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
291 val netconfSessionSpy = spyk(netconfSession)
292 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
293 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
294 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out")
295 every { netconfSessionSpy.checkAndReestablish() } just Runs
297 netconfSessionSpy.syncRpc("0", "0")
302 fun `syncRpc throws NetconfException if ExecutionException is caught`() {
303 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
304 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
305 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = false)
306 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
307 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
308 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
309 ExecutionException("exec exception", Exception("nested exception"))
310 every { netconfSessionSpy["close"]() as Unit } just Runs
311 every { netconfSessionSpy.checkAndReestablish() } just Runs
312 netconfSessionSpy.setSession(mockClientSession)
314 netconfSessionSpy.syncRpc("0", "0")
319 fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
320 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
321 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
322 val netconfSessionSpy = spyk(netconfSession)
323 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
324 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
325 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
326 ExecutionException("exec exception", Exception("nested exception"))
327 every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception")
328 every { netconfSessionSpy.checkAndReestablish() } just Runs
330 netconfSessionSpy.syncRpc("0", "0")
331 //make sure replies are cleared...
332 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
333 verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() }
338 fun `asyncRpc runs normally`() {
339 val netconfSessionSpy = spyk(netconfSession)
340 every { netconfSessionSpy.checkAndReestablish() } just Runs
341 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
342 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
344 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
345 every { netconfSessionSpy.checkAndReestablish() } just Runs
346 //make sure the future gets resolved
347 assertTrue { rpcResultFuture.get() == futureMsg }
348 //make sure that clearReplies wasn't called (reestablishConnection check)
349 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
353 fun `asyncRpc wraps exception`() {
354 val netconfSessionSpy = spyk(netconfSession)
355 every { netconfSessionSpy.checkAndReestablish() } just Runs
356 val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync {
357 throw Exception("blah")
359 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
361 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
362 every { netconfSessionSpy.checkAndReestablish() } just Runs
363 val e = assertFailsWith(exceptionClass = ExecutionException::class, message = futureMsg) {
364 rpcResultFuture.get()
367 assertTrue { cause is NetconfException }
371 fun `connect starts underlying client`() {
372 val propertiesMap = hashMapOf<String, Any>()
373 every { mockSshClient.start() } just Runs
374 every { mockSshClient.properties } returns propertiesMap
375 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
376 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
377 every { netconfSessionSpy["startSession"]() as Unit } just Runs
378 netconfSessionSpy.setClient(mockSshClient)
379 netconfSessionSpy.connect()
380 verify { mockSshClient.start() }
381 assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) }
382 assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) }
386 fun `startSession tries to connect to user supplied device`() {
387 every { mockSshClient.start() } just Runs
388 every { mockSshClient.properties } returns hashMapOf<String, Any>()
389 //setup slots to capture values from the invocations
390 val userSlot = CapturingSlot<String>()
391 val ipSlot = CapturingSlot<String>()
392 val portSlot = CapturingSlot<Int>()
393 //create a future that succeeded
394 val succeededFuture = DefaultConnectFuture(Any(), Any())
395 succeededFuture.value = mockClientSession
396 every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture
397 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
398 every { netconfSessionSpy["authSession"]() as Unit } just Runs
399 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
400 netconfSessionSpy.setClient(mockSshClient)
402 netconfSessionSpy.connect()
404 verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) }
405 assertEquals(deviceInfo.username, userSlot.captured)
406 assertEquals(deviceInfo.ipAddress, ipSlot.captured)
407 assertEquals(deviceInfo.port, portSlot.captured)
408 verify { netconfSessionSpy["authSession"]() }
412 fun `authSession throws exception if ClientSession is not AUTHED`() {
413 assertFailsWith(exceptionClass = NetconfException::class) {
414 //after client session connects,
415 every { mockSshClient.start() } just Runs
416 every { mockSshClient.properties } returns hashMapOf<String, Any>()
417 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
418 succeededAuthFuture.value = true //AuthFuture's value is Boolean
419 val passSlot = CapturingSlot<String>()
420 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
421 every { mockClientSession.auth() } returns succeededAuthFuture
422 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
423 succeededSessionFuture.value = mockClientSession
424 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
425 every { mockClientSession.waitFor(any(), any()) } returns
426 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED)
427 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
428 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
429 netconfSessionSpy.setClient(mockSshClient)
431 netconfSessionSpy.connect()
435 //common mock initializer for more weird tests.
436 private fun setupOpenChannelMocks(): Unit {
437 every { mockSshClient.start() } just Runs
438 every { mockSshClient.properties } returns hashMapOf<String, Any>()
439 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
440 succeededAuthFuture.value = true //AuthFuture's value is Boolean
441 val passSlot = CapturingSlot<String>()
442 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
443 every { mockClientSession.auth() } returns succeededAuthFuture
444 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
445 succeededSessionFuture.value = mockClientSession
446 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
447 every { mockClientSession.waitFor(any(), any()) } returns
448 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH,
449 ClientSession.ClientSessionEvent.CLOSED,
450 ClientSession.ClientSessionEvent.AUTHED)
452 every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem
453 every { mockClientChannel.invertedOut } returns sampleInputStream
454 every { mockClientChannel.invertedIn } returns sampleOutputStream
458 fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() {
459 //after client session connects, make sure the client receives authentication
460 setupOpenChannelMocks()
461 val channelFuture = DefaultOpenFuture(Any(), Any())
462 channelFuture.value = true
463 channelFuture.setOpened()
464 val connectFuture = DefaultConnectFuture(Any(), Any())
465 connectFuture.value = mockClientSession
466 connectFuture.session = mockClientSession
467 every { mockSubsystem.open() } returns channelFuture
468 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture
470 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
471 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
472 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
473 netconfSessionSpy.setClient(mockSshClient)
475 netconfSessionSpy.connect()
477 verify { mockSubsystem.open() }
482 fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() {
483 assertFailsWith(exceptionClass = NetconfException::class) {
484 //after client session connects, make sure the client receives authentication
485 setupOpenChannelMocks()
486 val channelFuture = DefaultOpenFuture(Any(), Any())
487 every { mockSubsystem.open() } returns channelFuture
488 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
489 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
490 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
491 netconfSessionSpy.setClient(mockSshClient)
493 netconfSessionSpy.connect()
495 verify { mockSubsystem.open() }
501 fun `disconnect closes session, channel, and client`() {
502 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
503 every { mockClientSession.close() } just Runs
504 every { mockClientChannel.close() } just Runs
505 every { mockSshClient.close() } just Runs
506 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
507 netconfSessionSpy.setChannel(mockClientChannel)
508 netconfSessionSpy.setClient(mockSshClient)
509 netconfSessionSpy.setSession(mockClientSession)
511 netconfSessionSpy.disconnect()
513 verify { mockClientSession.close() }
514 verify { mockClientChannel.close() }
515 verify { mockSshClient.close() }
519 fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others
520 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
521 every { mockClientSession.close() } just Runs
522 every { mockClientChannel.close() } throws IOException("channel doesn't want to close!")
523 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
524 netconfSessionSpy.setChannel(mockClientChannel)
525 netconfSessionSpy.setClient(mockSshClient)
526 netconfSessionSpy.setSession(mockClientSession)
528 netconfSessionSpy.disconnect()
530 verify { mockClientSession.close() }