2 * Copyright © 2019 Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import io.mockk.CapturingSlot
25 import io.mockk.verify
26 import org.apache.sshd.client.SshClient
27 import org.apache.sshd.client.channel.ChannelSubsystem
28 import org.apache.sshd.client.channel.ClientChannel
29 import org.apache.sshd.client.future.DefaultAuthFuture
30 import org.apache.sshd.client.future.DefaultConnectFuture
31 import org.apache.sshd.client.future.DefaultOpenFuture
32 import org.apache.sshd.client.session.ClientSession
33 import org.apache.sshd.common.FactoryManager
34 import org.junit.Before
35 import org.junit.Ignore
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse
39 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
40 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
41 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
42 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
43 import java.io.ByteArrayInputStream
44 import java.io.ByteArrayOutputStream
45 import java.io.IOException
46 import java.io.InputStream
47 import java.nio.charset.StandardCharsets
48 import java.util.concurrent.CompletableFuture
49 import java.util.concurrent.ExecutionException
50 import java.util.concurrent.TimeoutException
51 import kotlin.test.assertEquals
52 import kotlin.test.assertFailsWith
53 import kotlin.test.assertTrue
55 class NetconfSessionImplTest {
57 val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply {
58 status = RpcStatus.SUCCESS
63 val FAILED_DEVICE_RESPONSE = DeviceResponse().apply {
64 status = RpcStatus.FAILURE
69 val deviceInfo: DeviceInfo = DeviceInfo().apply {
72 ipAddress = "localhost"
76 private const val someString = "Some string"
79 private lateinit var netconfSession: NetconfSessionImpl
80 private lateinit var netconfCommunicator: NetconfDeviceCommunicator
81 private lateinit var rpcService: NetconfRpcService
82 private lateinit var mockSshClient: SshClient
83 private lateinit var mockClientSession: ClientSession
84 private lateinit var mockClientChannel: ClientChannel
85 private lateinit var mockSubsystem: ChannelSubsystem
87 private val futureMsg = "blahblahblah"
88 private val request = "0"
89 private val sessionId = "0"
90 private val messageId = "asdfasdfadf"
91 private val deviceCapabilities = setOf("capability1", "capability2")
92 private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
93 private lateinit var sampleInputStream: InputStream
94 private lateinit var sampleOutputStream: ByteArrayOutputStream
98 netconfCommunicator = mockk()
100 netconfSession = NetconfSessionImpl(deviceInfo, rpcService)
101 netconfSession.setStreamHandler(netconfCommunicator)
102 mockSshClient = mockk()
103 mockClientSession = mockk()
104 mockClientChannel = mockk()
105 mockSubsystem = mockk()
106 sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8))
107 sampleOutputStream = ByteArrayOutputStream()
111 fun `connect calls appropriate methods`() {
112 val session = spyk(netconfSession, recordPrivateCalls = true)
113 every { session["startClient"]() as Unit } just Runs
115 verify { session["startClient"]() }
118 //look for NetconfException being thrown when cannot connect
120 fun `connect throws NetconfException on error`() {
121 val errMsg = "$deviceInfo: Failed to establish SSH session"
122 assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) {
123 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
124 every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg)
125 netconfSessionSpy.connect()
130 fun `disconnect without force option for rpcService succeeds`() {
131 //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
132 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
133 every { mockClientSession.close() } just Runs
134 every { mockSshClient.close() } just Runs
135 every { mockClientChannel.close() } just Runs
136 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
137 netconfSessionSpy.setSession(mockClientSession)
138 netconfSessionSpy.setClient(mockSshClient)
139 netconfSessionSpy.setChannel(mockClientChannel)
141 netconfSessionSpy.disconnect()
142 //make sure that rpcService.close session is not called again.
143 verify(exactly = 0) { rpcService.closeSession(true) }
144 verify { mockClientSession.close() }
145 verify { mockSshClient.close() }
146 verify { mockClientChannel.close() }
150 fun `disconnect with force option for rpcService succeeds`() {
151 //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
152 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
153 every { rpcService.closeSession(any()) } returns
154 FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE
155 every { mockClientSession.close() } just Runs
156 every { mockSshClient.close() } just Runs
157 every { mockClientChannel.close() } just Runs
158 netconfSessionSpy.setSession(mockClientSession)
159 netconfSessionSpy.setClient(mockSshClient)
160 netconfSessionSpy.setChannel(mockClientChannel)
162 netconfSessionSpy.disconnect()
164 verify(exactly = 2) { rpcService.closeSession(any()) }
165 verify { mockClientSession.close() }
166 verify { mockSshClient.close() }
167 verify { mockClientChannel.close() }
172 fun `disconnect wraps exception from ssh closing error`() {
173 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
174 every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!")
175 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
176 every { netconfSessionSpy.checkAndReestablish() } just Runs
177 netconfSessionSpy.disconnect()
178 verify { netconfSessionSpy["close"]() }
182 fun `reconnect calls disconnect and connect`() {
183 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
184 every { netconfSessionSpy.disconnect() } just Runs
185 every { netconfSessionSpy.connect() } just Runs
186 netconfSessionSpy.reconnect()
187 verify { netconfSessionSpy.disconnect() }
188 verify { netconfSessionSpy.connect() }
192 fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() {
193 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
194 every { mockSshClient.isClosed } returns true
195 netconfSessionSpy.setClient(mockSshClient)
196 every { netconfSessionSpy["startConnection"]() as Unit } just Runs
198 netconfSessionSpy.checkAndReestablish()
200 verify { netconfSessionSpy.clearReplies() }
201 verify { netconfSessionSpy["startConnection"]() }
205 fun `checkAndReestablish restarts session and clears replies on clientSession closing`() {
206 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
207 every { mockClientSession.isClosed } returns true
208 every { mockSshClient.isClosed } returns false
209 every { netconfSessionSpy["startSession"]() as Unit } just Runs
210 netconfSessionSpy.setClient(mockSshClient)
211 netconfSessionSpy.setSession(mockClientSession)
213 netconfSessionSpy.checkAndReestablish()
215 verify { netconfSessionSpy.clearReplies() }
216 verify { netconfSessionSpy["startSession"]() }
220 fun `checkAndReestablish reopens channel and clears replies on channel closing`() {
221 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
222 every { mockClientSession.isClosed } returns false
223 every { mockSshClient.isClosed } returns false
224 every { mockClientChannel.isClosed } returns true
225 every { netconfSessionSpy["openChannel"]() as Unit } just Runs
226 netconfSessionSpy.setClient(mockSshClient)
227 netconfSessionSpy.setSession(mockClientSession)
228 netconfSessionSpy.setChannel(mockClientChannel)
230 netconfSessionSpy.checkAndReestablish()
232 verify { netconfSessionSpy.clearReplies() }
233 verify { netconfSessionSpy["openChannel"]() }
238 fun `syncRpc runs normally`() {
239 val netconfSessionSpy = spyk(netconfSession)
240 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
242 //test the case where SSH connection did not need to be re-established.
243 //put an existing item into the replies
244 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
245 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
246 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
247 every { netconfSessionSpy.checkAndReestablish() } just Runs
249 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
250 //make sure the replies didn't change
252 netconfSessionSpy.getReplies().size == 1 &&
253 netconfSessionSpy.getReplies().containsKey("somekey")
255 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
260 fun `syncRpc still succeeds and replies are cleared on client disconnect`() {
261 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
262 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
264 //put an item into the replies
265 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
267 //tests the case where SSH session needs to be re-established.
268 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
269 every { netconfSessionSpy["startClient"]() as Unit } just Runs
270 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
271 every { mockSshClient.isClosed } returns true
272 netconfSessionSpy.setClient(mockSshClient)
275 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
276 //make sure the replies got cleared out
277 assertTrue { netconfSessionSpy.getReplies().isEmpty() }
278 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
282 //Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator
284 fun `syncRpc throws NetconfException if InterruptedException is caught`() {
285 val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest"
286 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
287 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
288 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
289 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
290 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted")
291 every { netconfSessionSpy.checkAndReestablish() } just Runs
293 netconfSessionSpy.syncRpc("0", "0")
298 fun `syncRpc throws NetconfException if TimeoutException is caught`() {
299 val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
300 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
301 val netconfSessionSpy = spyk(netconfSession)
302 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
303 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
304 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out")
305 every { netconfSessionSpy.checkAndReestablish() } just Runs
307 netconfSessionSpy.syncRpc("0", "0")
312 fun `syncRpc throws NetconfException if ExecutionException is caught`() {
313 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
314 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
315 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = false)
316 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
317 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
318 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
319 ExecutionException("exec exception", Exception("nested exception"))
320 every { netconfSessionSpy["close"]() as Unit } just Runs
321 every { netconfSessionSpy.checkAndReestablish() } just Runs
322 netconfSessionSpy.setSession(mockClientSession)
324 netconfSessionSpy.syncRpc("0", "0")
329 fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
330 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
331 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
332 val netconfSessionSpy = spyk(netconfSession)
333 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
334 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
335 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
336 ExecutionException("exec exception", Exception("nested exception"))
337 every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception")
338 every { netconfSessionSpy.checkAndReestablish() } just Runs
340 netconfSessionSpy.syncRpc("0", "0")
341 //make sure replies are cleared...
342 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
343 verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() }
348 fun `asyncRpc runs normally`() {
349 val netconfSessionSpy = spyk(netconfSession)
350 every { netconfSessionSpy.checkAndReestablish() } just Runs
351 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
352 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
354 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
355 every { netconfSessionSpy.checkAndReestablish() } just Runs
356 //make sure the future gets resolved
357 assertTrue { rpcResultFuture.get() == futureMsg }
358 //make sure that clearReplies wasn't called (reestablishConnection check)
359 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
364 //TODO: get 't' inside asyncRpc to be a Throwable
365 fun `asyncRpc wraps exception`() {
366 assertFailsWith(exceptionClass = NetconfException::class, message = futureMsg) {
367 val netconfSessionSpy = spyk(netconfSession)
368 val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync {
369 throw Exception("blah")
371 futureRet.completeExceptionally(IOException("something is wrong"))
372 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
374 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
379 fun `connect starts underlying client`() {
380 val propertiesMap = hashMapOf<String, Any>()
381 every { mockSshClient.start() } just Runs
382 every { mockSshClient.properties } returns propertiesMap
383 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
384 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
385 every { netconfSessionSpy["startSession"]() as Unit } just Runs
386 netconfSessionSpy.setClient(mockSshClient)
387 netconfSessionSpy.connect()
388 verify { mockSshClient.start() }
389 assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) }
390 assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) }
394 fun `startSession tries to connect to user supplied device`() {
395 every { mockSshClient.start() } just Runs
396 every { mockSshClient.properties } returns hashMapOf<String, Any>()
397 //setup slots to capture values from the invocations
398 val userSlot = CapturingSlot<String>()
399 val ipSlot = CapturingSlot<String>()
400 val portSlot = CapturingSlot<Int>()
401 //create a future that succeeded
402 val succeededFuture = DefaultConnectFuture(Any(), Any())
403 succeededFuture.value = mockClientSession
404 every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture
405 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
406 every { netconfSessionSpy["authSession"]() as Unit } just Runs
407 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
408 netconfSessionSpy.setClient(mockSshClient)
410 netconfSessionSpy.connect()
412 verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) }
413 assertEquals(deviceInfo.username, userSlot.captured)
414 assertEquals(deviceInfo.ipAddress, ipSlot.captured)
415 assertEquals(deviceInfo.port, portSlot.captured)
416 verify { netconfSessionSpy["authSession"]() }
420 fun `authSession throws exception if ClientSession is not AUTHED`() {
421 assertFailsWith(exceptionClass = NetconfException::class) {
422 //after client session connects,
423 every { mockSshClient.start() } just Runs
424 every { mockSshClient.properties } returns hashMapOf<String, Any>()
425 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
426 succeededAuthFuture.value = true //AuthFuture's value is Boolean
427 val passSlot = CapturingSlot<String>()
428 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
429 every { mockClientSession.auth() } returns succeededAuthFuture
430 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
431 succeededSessionFuture.value = mockClientSession
432 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
433 every { mockClientSession.waitFor(any(), any()) } returns
434 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED)
435 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
436 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
437 netconfSessionSpy.setClient(mockSshClient)
439 netconfSessionSpy.connect()
443 //common mock initializer for more weird tests.
444 private fun setupOpenChannelMocks(): Unit {
445 every { mockSshClient.start() } just Runs
446 every { mockSshClient.properties } returns hashMapOf<String, Any>()
447 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
448 succeededAuthFuture.value = true //AuthFuture's value is Boolean
449 val passSlot = CapturingSlot<String>()
450 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
451 every { mockClientSession.auth() } returns succeededAuthFuture
452 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
453 succeededSessionFuture.value = mockClientSession
454 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
455 every { mockClientSession.waitFor(any(), any()) } returns
456 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH,
457 ClientSession.ClientSessionEvent.CLOSED,
458 ClientSession.ClientSessionEvent.AUTHED)
460 every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem
461 every { mockClientChannel.invertedOut } returns sampleInputStream
462 every { mockClientChannel.invertedIn } returns sampleOutputStream
466 fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() {
467 //after client session connects, make sure the client receives authentication
468 setupOpenChannelMocks()
469 val channelFuture = DefaultOpenFuture(Any(), Any())
470 channelFuture.value = true
471 channelFuture.setOpened()
472 val connectFuture = DefaultConnectFuture(Any(), Any())
473 connectFuture.value = mockClientSession
474 connectFuture.session = mockClientSession
475 every { mockSubsystem.open() } returns channelFuture
476 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture
478 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
479 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
480 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
481 netconfSessionSpy.setClient(mockSshClient)
483 netconfSessionSpy.connect()
485 verify { mockSubsystem.open() }
490 fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() {
491 assertFailsWith(exceptionClass = NetconfException::class) {
492 //after client session connects, make sure the client receives authentication
493 setupOpenChannelMocks()
494 val channelFuture = DefaultOpenFuture(Any(), Any())
495 every { mockSubsystem.open() } returns channelFuture
496 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
497 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
498 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
499 netconfSessionSpy.setClient(mockSshClient)
501 netconfSessionSpy.connect()
503 verify { mockSubsystem.open() }
509 fun `disconnect closes session, channel, and client`() {
510 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
511 every { mockClientSession.close() } just Runs
512 every { mockClientChannel.close() } just Runs
513 every { mockSshClient.close() } just Runs
514 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
515 netconfSessionSpy.setChannel(mockClientChannel)
516 netconfSessionSpy.setClient(mockSshClient)
517 netconfSessionSpy.setSession(mockClientSession)
519 netconfSessionSpy.disconnect()
521 verify { mockClientSession.close() }
522 verify { mockClientChannel.close() }
523 verify { mockSshClient.close() }
527 fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others
528 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
529 every { mockClientSession.close() } just Runs
530 every { mockClientChannel.close() } throws IOException("channel doesn't want to close!")
531 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
532 netconfSessionSpy.setChannel(mockClientChannel)
533 netconfSessionSpy.setClient(mockSshClient)
534 netconfSessionSpy.setSession(mockClientSession)
536 netconfSessionSpy.disconnect()
538 verify { mockClientSession.close() }