2 * Copyright © 2019 Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import io.mockk.CapturingSlot
25 import io.mockk.verify
26 import org.apache.sshd.client.SshClient
27 import org.apache.sshd.client.channel.ChannelSubsystem
28 import org.apache.sshd.client.channel.ClientChannel
29 import org.apache.sshd.client.future.DefaultAuthFuture
30 import org.apache.sshd.client.future.DefaultConnectFuture
31 import org.apache.sshd.client.future.DefaultOpenFuture
32 import org.apache.sshd.client.session.ClientSession
33 import org.apache.sshd.common.FactoryManager
34 import org.junit.Before
35 import org.junit.Ignore
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse
39 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
40 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
41 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
42 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
43 import java.io.ByteArrayInputStream
44 import java.io.ByteArrayOutputStream
45 import java.io.IOException
46 import java.io.InputStream
47 import java.nio.charset.StandardCharsets
48 import java.util.concurrent.CompletableFuture
49 import java.util.concurrent.ExecutionException
50 import java.util.concurrent.TimeoutException
51 import kotlin.test.assertEquals
52 import kotlin.test.assertFailsWith
53 import kotlin.test.assertTrue
55 class NetconfSessionImplTest {
57 val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply {
58 status = RpcStatus.SUCCESS
63 val FAILED_DEVICE_RESPONSE = DeviceResponse().apply {
64 status = RpcStatus.FAILURE
69 val deviceInfo: DeviceInfo = DeviceInfo().apply {
72 ipAddress = "localhost"
76 private const val someString = "Some string"
79 private lateinit var netconfSession: NetconfSessionImpl
80 private lateinit var netconfCommunicator: NetconfDeviceCommunicator
81 private lateinit var rpcService: NetconfRpcService
82 private lateinit var mockSshClient: SshClient
83 private lateinit var mockClientSession: ClientSession
84 private lateinit var mockClientChannel: ClientChannel
85 private lateinit var mockSubsystem: ChannelSubsystem
87 private val futureMsg = "blahblahblah"
88 private val request = "0"
89 private val sessionId = "0"
90 private val messageId = "asdfasdfadf"
91 private val deviceCapabilities = setOf("capability1", "capability2")
92 private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
93 private lateinit var sampleInputStream: InputStream
94 private lateinit var sampleOutputStream: ByteArrayOutputStream
98 netconfCommunicator = mockk()
100 netconfSession = NetconfSessionImpl(deviceInfo, rpcService)
101 netconfSession.setStreamHandler(netconfCommunicator)
102 mockSshClient = mockk()
103 mockClientSession = mockk()
104 mockClientChannel = mockk()
105 mockSubsystem = mockk()
106 sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8))
107 sampleOutputStream = ByteArrayOutputStream()
111 fun `connect calls appropriate methods`() {
112 val session = spyk(netconfSession, recordPrivateCalls = true)
113 every { session["startClient"]() as Unit } just Runs
115 verify { session["startClient"]() }
118 //look for NetconfException being thrown when cannot connect
120 fun `connect throws NetconfException on error`() {
121 val errMsg = "$deviceInfo: Failed to establish SSH session"
122 assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) {
123 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
124 every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg)
125 netconfSessionSpy.connect()
130 fun `disconnect without force option for rpcService succeeds`() {
131 //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
132 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
133 every { mockClientSession.close() } just Runs
134 every { mockSshClient.close() } just Runs
135 every { mockClientChannel.close() } just Runs
136 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
137 netconfSessionSpy.setSession(mockClientSession)
138 netconfSessionSpy.setClient(mockSshClient)
139 netconfSessionSpy.setChannel(mockClientChannel)
141 netconfSessionSpy.disconnect()
142 //make sure that rpcService.close session is not called again.
143 verify(exactly = 0) { rpcService.closeSession(true) }
144 verify { mockClientSession.close() }
145 verify { mockSshClient.close() }
146 verify { mockClientChannel.close() }
150 fun `disconnect with force option for rpcService succeeds`() {
151 //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
152 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
153 every { rpcService.closeSession(any()) } returns
154 FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE
155 every { mockClientSession.close() } just Runs
156 every { mockSshClient.close() } just Runs
157 every { mockClientChannel.close() } just Runs
158 netconfSessionSpy.setSession(mockClientSession)
159 netconfSessionSpy.setClient(mockSshClient)
160 netconfSessionSpy.setChannel(mockClientChannel)
162 netconfSessionSpy.disconnect()
164 verify(exactly = 2) { rpcService.closeSession(any()) }
165 verify { mockClientSession.close() }
166 verify { mockSshClient.close() }
167 verify { mockClientChannel.close() }
171 @Ignore //TODO undo close method removal
173 fun `disconnect wraps exception from ssh closing error`() {
174 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
175 every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!")
176 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
177 every { netconfSessionSpy.checkAndReestablish() } just Runs
178 netconfSessionSpy.disconnect()
179 verify { netconfSessionSpy["close"]() }
183 fun `reconnect calls disconnect and connect`() {
184 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
185 every { netconfSessionSpy.disconnect() } just Runs
186 every { netconfSessionSpy.connect() } just Runs
187 netconfSessionSpy.reconnect()
188 verify { netconfSessionSpy.disconnect() }
189 verify { netconfSessionSpy.connect() }
193 fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() {
194 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
195 every { mockSshClient.isClosed } returns true
196 netconfSessionSpy.setClient(mockSshClient)
197 every { netconfSessionSpy["startConnection"]() as Unit } just Runs
199 netconfSessionSpy.checkAndReestablish()
201 verify { netconfSessionSpy.clearReplies() }
202 verify { netconfSessionSpy["startConnection"]() }
206 fun `checkAndReestablish restarts session and clears replies on clientSession closing`() {
207 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
208 every { mockClientSession.isClosed } returns true
209 every { mockSshClient.isClosed } returns false
210 every { netconfSessionSpy["startSession"]() as Unit } just Runs
211 netconfSessionSpy.setClient(mockSshClient)
212 netconfSessionSpy.setSession(mockClientSession)
214 netconfSessionSpy.checkAndReestablish()
216 verify { netconfSessionSpy.clearReplies() }
217 verify { netconfSessionSpy["startSession"]() }
221 fun `checkAndReestablish reopens channel and clears replies on channel closing`() {
222 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
223 every { mockClientSession.isClosed } returns false
224 every { mockSshClient.isClosed } returns false
225 every { mockClientChannel.isClosed } returns true
226 every { netconfSessionSpy["openChannel"]() as Unit } just Runs
227 netconfSessionSpy.setClient(mockSshClient)
228 netconfSessionSpy.setSession(mockClientSession)
229 netconfSessionSpy.setChannel(mockClientChannel)
231 netconfSessionSpy.checkAndReestablish()
233 verify { netconfSessionSpy.clearReplies() }
234 verify { netconfSessionSpy["openChannel"]() }
239 fun `syncRpc runs normally`() {
240 val netconfSessionSpy = spyk(netconfSession)
241 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
243 //test the case where SSH connection did not need to be re-established.
244 //put an existing item into the replies
245 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
246 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
247 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
248 every { netconfSessionSpy.checkAndReestablish() } just Runs
250 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
251 //make sure the replies didn't change
253 netconfSessionSpy.getReplies().size == 1 &&
254 netconfSessionSpy.getReplies().containsKey("somekey")
256 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
261 fun `syncRpc still succeeds and replies are cleared on client disconnect`() {
262 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
263 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
265 //put an item into the replies
266 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
268 //tests the case where SSH session needs to be re-established.
269 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
270 every { netconfSessionSpy["startClient"]() as Unit } just Runs
271 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
272 every { mockSshClient.isClosed } returns true
273 netconfSessionSpy.setClient(mockSshClient)
276 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
277 //make sure the replies got cleared out
278 assertTrue { netconfSessionSpy.getReplies().isEmpty() }
279 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
283 //Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator
285 fun `syncRpc throws NetconfException if InterruptedException is caught`() {
286 val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest"
287 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
288 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
289 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
290 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
291 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted")
292 every { netconfSessionSpy.checkAndReestablish() } just Runs
294 netconfSessionSpy.syncRpc("0", "0")
298 @Ignore //TODO revert back on getFutureFromSendMessage
300 fun `syncRpc throws NetconfException if TimeoutException is caught`() {
301 val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
302 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
303 val netconfSessionSpy = spyk(netconfSession)
304 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
305 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
306 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out")
307 every { netconfSessionSpy.checkAndReestablish() } just Runs
309 netconfSessionSpy.syncRpc("0", "0")
315 fun `syncRpc throws NetconfException if ExecutionException is caught`() {
316 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
317 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
318 val netconfSessionSpy = spyk(netconfSession)
319 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
320 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
321 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
322 ExecutionException("exec exception", Exception("nested exception")) //TODO revert getFutureFromSendMessage back
323 every { netconfSessionSpy.checkAndReestablish() } just Runs
325 netconfSessionSpy.syncRpc("0", "0")
329 @Ignore //TODO revert back on getFutureFromSendMessage
331 fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
332 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
333 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
334 val netconfSessionSpy = spyk(netconfSession)
335 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
336 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
337 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
338 ExecutionException("exec exception", Exception("nested exception"))
339 every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception")
340 every { netconfSessionSpy.checkAndReestablish() } just Runs
342 netconfSessionSpy.syncRpc("0", "0")
343 //make sure replies are cleared...
344 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
345 verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() }
350 fun `asyncRpc runs normally`() {
351 val netconfSessionSpy = spyk(netconfSession)
352 every { netconfSessionSpy.checkAndReestablish() } just Runs
353 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
354 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
356 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
357 every { netconfSessionSpy.checkAndReestablish() } just Runs
358 //make sure the future gets resolved
359 assertTrue { rpcResultFuture.get() == futureMsg }
360 //make sure that clearReplies wasn't called (reestablishConnection check)
361 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
366 //TODO: get 't' inside asyncRpc to be a Throwable
367 fun `asyncRpc wraps exception`() {
368 assertFailsWith(exceptionClass = NetconfException::class, message = futureMsg) {
369 val netconfSessionSpy = spyk(netconfSession)
370 val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync {
371 throw Exception("blah")
373 futureRet.completeExceptionally(IOException("something is wrong"))
374 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
376 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
381 fun `connect starts underlying client`() {
382 val propertiesMap = hashMapOf<String, Any>()
383 every { mockSshClient.start() } just Runs
384 every { mockSshClient.properties } returns propertiesMap
385 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
386 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
387 every { netconfSessionSpy["startSession"]() as Unit } just Runs
388 netconfSessionSpy.setClient(mockSshClient)
389 netconfSessionSpy.connect()
390 verify { mockSshClient.start() }
391 assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) }
392 assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) }
396 fun `startSession tries to connect to user supplied device`() {
397 every { mockSshClient.start() } just Runs
398 every { mockSshClient.properties } returns hashMapOf<String, Any>()
399 //setup slots to capture values from the invocations
400 val userSlot = CapturingSlot<String>()
401 val ipSlot = CapturingSlot<String>()
402 val portSlot = CapturingSlot<Int>()
403 //create a future that succeeded
404 val succeededFuture = DefaultConnectFuture(Any(), Any())
405 succeededFuture.value = mockClientSession
406 every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture
407 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
408 every { netconfSessionSpy["authSession"]() as Unit } just Runs
409 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
410 netconfSessionSpy.setClient(mockSshClient)
412 netconfSessionSpy.connect()
414 verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) }
415 assertEquals(deviceInfo.username, userSlot.captured)
416 assertEquals(deviceInfo.ipAddress, ipSlot.captured)
417 assertEquals(deviceInfo.port, portSlot.captured)
418 verify { netconfSessionSpy["authSession"]() }
422 fun `authSession throws exception if ClientSession is not AUTHED`() {
423 assertFailsWith(exceptionClass = NetconfException::class) {
424 //after client session connects,
425 every { mockSshClient.start() } just Runs
426 every { mockSshClient.properties } returns hashMapOf<String, Any>()
427 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
428 succeededAuthFuture.value = true //AuthFuture's value is Boolean
429 val passSlot = CapturingSlot<String>()
430 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
431 every { mockClientSession.auth() } returns succeededAuthFuture
432 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
433 succeededSessionFuture.value = mockClientSession
434 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
435 every { mockClientSession.waitFor(any(), any()) } returns
436 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED)
437 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
438 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
439 netconfSessionSpy.setClient(mockSshClient)
441 netconfSessionSpy.connect()
445 //common mock initializer for more weird tests.
446 private fun setupOpenChannelMocks(): Unit {
447 every { mockSshClient.start() } just Runs
448 every { mockSshClient.properties } returns hashMapOf<String, Any>()
449 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
450 succeededAuthFuture.value = true //AuthFuture's value is Boolean
451 val passSlot = CapturingSlot<String>()
452 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
453 every { mockClientSession.auth() } returns succeededAuthFuture
454 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
455 succeededSessionFuture.value = mockClientSession
456 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
457 every { mockClientSession.waitFor(any(), any()) } returns
458 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH,
459 ClientSession.ClientSessionEvent.CLOSED,
460 ClientSession.ClientSessionEvent.AUTHED)
462 every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem
463 every { mockClientChannel.invertedOut } returns sampleInputStream
464 every { mockClientChannel.invertedIn } returns sampleOutputStream
468 fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() {
469 //after client session connects, make sure the client receives authentication
470 setupOpenChannelMocks()
471 val channelFuture = DefaultOpenFuture(Any(), Any())
472 channelFuture.value = true
473 channelFuture.setOpened()
474 val connectFuture = DefaultConnectFuture(Any(), Any())
475 connectFuture.value = mockClientSession
476 connectFuture.session = mockClientSession
477 every { mockSubsystem.open() } returns channelFuture
478 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture
480 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
481 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
482 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
483 netconfSessionSpy.setClient(mockSshClient)
485 netconfSessionSpy.connect()
487 verify { mockSubsystem.open() }
492 fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() {
493 assertFailsWith(exceptionClass = NetconfException::class) {
494 //after client session connects, make sure the client receives authentication
495 setupOpenChannelMocks()
496 val channelFuture = DefaultOpenFuture(Any(), Any())
497 every { mockSubsystem.open() } returns channelFuture
498 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
499 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
500 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
501 netconfSessionSpy.setClient(mockSshClient)
503 netconfSessionSpy.connect()
505 verify { mockSubsystem.open() }
511 fun `disconnect closes session, channel, and client`() {
512 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
513 every { mockClientSession.close() } just Runs
514 every { mockClientChannel.close() } just Runs
515 every { mockSshClient.close() } just Runs
516 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
517 netconfSessionSpy.setChannel(mockClientChannel)
518 netconfSessionSpy.setClient(mockSshClient)
519 netconfSessionSpy.setSession(mockClientSession)
521 netconfSessionSpy.disconnect()
523 verify { mockClientSession.close() }
524 verify { mockClientChannel.close() }
525 verify { mockSshClient.close() }
530 fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others
531 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
532 every { mockClientSession.close() } just Runs
533 every { mockClientChannel.close() } throws IOException("channel doesn't want to close!")
534 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
535 netconfSessionSpy.setChannel(mockClientChannel)
536 netconfSessionSpy.setClient(mockSshClient)
537 netconfSessionSpy.setSession(mockClientSession)
539 netconfSessionSpy.disconnect()
541 verify { mockClientSession.close() }