2 * Copyright © 2019 Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import io.mockk.CapturingSlot
25 import io.mockk.verify
26 import org.apache.sshd.client.SshClient
27 import org.apache.sshd.client.channel.ChannelSubsystem
28 import org.apache.sshd.client.channel.ClientChannel
29 import org.apache.sshd.client.future.DefaultAuthFuture
30 import org.apache.sshd.client.future.DefaultConnectFuture
31 import org.apache.sshd.client.future.DefaultOpenFuture
32 import org.apache.sshd.client.session.ClientSession
33 import org.apache.sshd.common.FactoryManager
34 import org.junit.Before
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
39 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
40 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
41 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
42 import java.io.ByteArrayInputStream
43 import java.io.ByteArrayOutputStream
44 import java.io.IOException
45 import java.io.InputStream
46 import java.nio.charset.StandardCharsets
47 import java.util.concurrent.CompletableFuture
48 import java.util.concurrent.ExecutionException
49 import java.util.concurrent.TimeoutException
50 import kotlin.test.assertEquals
51 import kotlin.test.assertFailsWith
52 import kotlin.test.assertTrue
54 class NetconfSessionImplTest {
56 val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply {
57 status = RpcStatus.SUCCESS
62 val FAILED_DEVICE_RESPONSE = DeviceResponse().apply {
63 status = RpcStatus.FAILURE
68 val deviceInfo: DeviceInfo = DeviceInfo().apply {
71 ipAddress = "localhost"
75 private const val someString = "Some string"
78 private lateinit var netconfSession: NetconfSessionImpl
79 private lateinit var netconfCommunicator: NetconfDeviceCommunicator
80 private lateinit var rpcService: NetconfRpcService
81 private lateinit var mockSshClient: SshClient
82 private lateinit var mockClientSession: ClientSession
83 private lateinit var mockClientChannel: ClientChannel
84 private lateinit var mockSubsystem: ChannelSubsystem
86 private val futureMsg = "blahblahblah"
87 private val request = "0"
88 private val sessionId = "0"
89 private val messageId = "asdfasdfadf"
90 private val deviceCapabilities = setOf("capability1", "capability2")
91 private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
92 private lateinit var sampleInputStream: InputStream
93 private lateinit var sampleOutputStream: ByteArrayOutputStream
97 netconfCommunicator = mockk()
99 netconfSession = NetconfSessionImpl(deviceInfo, rpcService)
100 netconfSession.setStreamHandler(netconfCommunicator)
101 mockSshClient = mockk()
102 mockClientSession = mockk()
103 mockClientChannel = mockk()
104 mockSubsystem = mockk()
105 sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8))
106 sampleOutputStream = ByteArrayOutputStream()
110 fun `connect calls appropriate methods`() {
111 val session = spyk(netconfSession, recordPrivateCalls = true)
112 every { session["startClient"]() as Unit } just Runs
114 verify { session["startClient"]() }
117 //look for NetconfException being thrown when cannot connect
119 fun `connect throws NetconfException on error`() {
120 val errMsg = "$deviceInfo: Failed to establish SSH session"
121 assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) {
122 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
123 every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg)
124 netconfSessionSpy.connect()
129 fun `disconnect without force option for rpcService succeeds`() {
130 //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
131 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
132 every { mockClientSession.close() } just Runs
133 every { mockSshClient.close() } just Runs
134 every { mockClientChannel.close() } just Runs
135 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
136 netconfSessionSpy.setSession(mockClientSession)
137 netconfSessionSpy.setClient(mockSshClient)
138 netconfSessionSpy.setChannel(mockClientChannel)
140 netconfSessionSpy.disconnect()
141 //make sure that rpcService.close session is not called again.
142 verify(exactly = 0) { rpcService.closeSession(true) }
143 verify { mockClientSession.close() }
144 verify { mockSshClient.close() }
145 verify { mockClientChannel.close() }
149 fun `disconnect with force option for rpcService succeeds`() {
150 //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
151 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
152 every { rpcService.closeSession(any()) } returns
153 FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE
154 every { mockClientSession.close() } just Runs
155 every { mockSshClient.close() } just Runs
156 every { mockClientChannel.close() } just Runs
157 netconfSessionSpy.setSession(mockClientSession)
158 netconfSessionSpy.setClient(mockSshClient)
159 netconfSessionSpy.setChannel(mockClientChannel)
161 netconfSessionSpy.disconnect()
163 verify(exactly = 2) { rpcService.closeSession(any()) }
164 verify { mockClientSession.close() }
165 verify { mockSshClient.close() }
166 verify { mockClientChannel.close() }
171 fun `disconnect wraps exception from ssh closing error`() {
172 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
173 every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!")
174 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
175 every { netconfSessionSpy.checkAndReestablish() } just Runs
176 netconfSessionSpy.disconnect()
177 verify { netconfSessionSpy["close"]() }
181 fun `reconnect calls disconnect and connect`() {
182 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
183 every { netconfSessionSpy.disconnect() } just Runs
184 every { netconfSessionSpy.connect() } just Runs
185 netconfSessionSpy.reconnect()
186 verify { netconfSessionSpy.disconnect() }
187 verify { netconfSessionSpy.connect() }
191 fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() {
192 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
193 every { mockSshClient.isClosed } returns true
194 netconfSessionSpy.setClient(mockSshClient)
195 every { netconfSessionSpy["startConnection"]() as Unit } just Runs
197 netconfSessionSpy.checkAndReestablish()
199 verify { netconfSessionSpy.clearReplies() }
200 verify { netconfSessionSpy["startConnection"]() }
204 fun `checkAndReestablish restarts session and clears replies on clientSession closing`() {
205 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
206 every { mockClientSession.isClosed } returns true
207 every { mockSshClient.isClosed } returns false
208 every { netconfSessionSpy["startSession"]() as Unit } just Runs
209 netconfSessionSpy.setClient(mockSshClient)
210 netconfSessionSpy.setSession(mockClientSession)
212 netconfSessionSpy.checkAndReestablish()
214 verify { netconfSessionSpy.clearReplies() }
215 verify { netconfSessionSpy["startSession"]() }
219 fun `checkAndReestablish reopens channel and clears replies on channel closing`() {
220 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
221 every { mockClientSession.isClosed } returns false
222 every { mockSshClient.isClosed } returns false
223 every { mockClientChannel.isClosed } returns true
224 every { netconfSessionSpy["openChannel"]() as Unit } just Runs
225 netconfSessionSpy.setClient(mockSshClient)
226 netconfSessionSpy.setSession(mockClientSession)
227 netconfSessionSpy.setChannel(mockClientChannel)
229 netconfSessionSpy.checkAndReestablish()
231 verify { netconfSessionSpy.clearReplies() }
232 verify { netconfSessionSpy["openChannel"]() }
237 fun `syncRpc runs normally`() {
238 val netconfSessionSpy = spyk(netconfSession)
239 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
241 //test the case where SSH connection did not need to be re-established.
242 //put an existing item into the replies
243 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
244 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
245 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
246 every { netconfSessionSpy.checkAndReestablish() } just Runs
248 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
249 //make sure the replies didn't change
251 netconfSessionSpy.getReplies().size == 1 &&
252 netconfSessionSpy.getReplies().containsKey("somekey")
254 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
259 fun `syncRpc still succeeds and replies are cleared on client disconnect`() {
260 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
261 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
263 //put an item into the replies
264 netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
266 //tests the case where SSH session needs to be re-established.
267 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
268 every { netconfSessionSpy["startClient"]() as Unit } just Runs
269 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
270 every { mockSshClient.isClosed } returns true
271 netconfSessionSpy.setClient(mockSshClient)
274 assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
275 //make sure the replies got cleared out
276 assertTrue { netconfSessionSpy.getReplies().isEmpty() }
277 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
280 //Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator
282 fun `syncRpc throws NetconfException if InterruptedException is caught`() {
283 val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest"
284 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
285 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
286 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
287 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
288 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted")
289 every { netconfSessionSpy.checkAndReestablish() } just Runs
291 netconfSessionSpy.syncRpc("0", "0")
296 fun `syncRpc throws NetconfException if TimeoutException is caught`() {
297 val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
298 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
299 val netconfSessionSpy = spyk(netconfSession)
300 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
301 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
302 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out")
303 every { netconfSessionSpy.checkAndReestablish() } just Runs
305 netconfSessionSpy.syncRpc("0", "0")
310 fun `syncRpc throws NetconfException if ExecutionException is caught`() {
311 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
312 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
313 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = false)
314 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
315 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
316 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
317 ExecutionException("exec exception", Exception("nested exception"))
318 every { netconfSessionSpy["close"]() as Unit } just Runs
319 every { netconfSessionSpy.checkAndReestablish() } just Runs
320 netconfSessionSpy.setSession(mockClientSession)
322 netconfSessionSpy.syncRpc("0", "0")
327 fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
328 val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
329 assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
330 val netconfSessionSpy = spyk(netconfSession)
331 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
332 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
333 every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
334 ExecutionException("exec exception", Exception("nested exception"))
335 every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception")
336 every { netconfSessionSpy.checkAndReestablish() } just Runs
338 netconfSessionSpy.syncRpc("0", "0")
339 //make sure replies are cleared...
340 verify(exactly = 1) { netconfSessionSpy.clearReplies() }
341 verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() }
346 fun `asyncRpc runs normally`() {
347 val netconfSessionSpy = spyk(netconfSession)
348 every { netconfSessionSpy.checkAndReestablish() } just Runs
349 val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
350 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
352 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
353 every { netconfSessionSpy.checkAndReestablish() } just Runs
354 //make sure the future gets resolved
355 assertTrue { rpcResultFuture.get() == futureMsg }
356 //make sure that clearReplies wasn't called (reestablishConnection check)
357 verify(exactly = 0) { netconfSessionSpy.clearReplies() }
361 fun `asyncRpc wraps exception`() {
362 val netconfSessionSpy = spyk(netconfSession)
363 every { netconfSessionSpy.checkAndReestablish() } just Runs
364 val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync {
365 throw Exception("blah")
367 every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
369 val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
370 every { netconfSessionSpy.checkAndReestablish() } just Runs
371 val e = assertFailsWith(exceptionClass = ExecutionException::class, message = futureMsg) {
372 rpcResultFuture.get()
375 assertTrue { cause is NetconfException }
379 fun `connect starts underlying client`() {
380 val propertiesMap = hashMapOf<String, Any>()
381 every { mockSshClient.start() } just Runs
382 every { mockSshClient.properties } returns propertiesMap
383 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
384 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
385 every { netconfSessionSpy["startSession"]() as Unit } just Runs
386 netconfSessionSpy.setClient(mockSshClient)
387 netconfSessionSpy.connect()
388 verify { mockSshClient.start() }
389 assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) }
390 assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) }
394 fun `startSession tries to connect to user supplied device`() {
395 every { mockSshClient.start() } just Runs
396 every { mockSshClient.properties } returns hashMapOf<String, Any>()
397 //setup slots to capture values from the invocations
398 val userSlot = CapturingSlot<String>()
399 val ipSlot = CapturingSlot<String>()
400 val portSlot = CapturingSlot<Int>()
401 //create a future that succeeded
402 val succeededFuture = DefaultConnectFuture(Any(), Any())
403 succeededFuture.value = mockClientSession
404 every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture
405 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
406 every { netconfSessionSpy["authSession"]() as Unit } just Runs
407 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
408 netconfSessionSpy.setClient(mockSshClient)
410 netconfSessionSpy.connect()
412 verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) }
413 assertEquals(deviceInfo.username, userSlot.captured)
414 assertEquals(deviceInfo.ipAddress, ipSlot.captured)
415 assertEquals(deviceInfo.port, portSlot.captured)
416 verify { netconfSessionSpy["authSession"]() }
420 fun `authSession throws exception if ClientSession is not AUTHED`() {
421 assertFailsWith(exceptionClass = NetconfException::class) {
422 //after client session connects,
423 every { mockSshClient.start() } just Runs
424 every { mockSshClient.properties } returns hashMapOf<String, Any>()
425 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
426 succeededAuthFuture.value = true //AuthFuture's value is Boolean
427 val passSlot = CapturingSlot<String>()
428 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
429 every { mockClientSession.auth() } returns succeededAuthFuture
430 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
431 succeededSessionFuture.value = mockClientSession
432 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
433 every { mockClientSession.waitFor(any(), any()) } returns
434 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED)
435 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
436 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
437 netconfSessionSpy.setClient(mockSshClient)
439 netconfSessionSpy.connect()
443 //common mock initializer for more weird tests.
444 private fun setupOpenChannelMocks(): Unit {
445 every { mockSshClient.start() } just Runs
446 every { mockSshClient.properties } returns hashMapOf<String, Any>()
447 val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
448 succeededAuthFuture.value = true //AuthFuture's value is Boolean
449 val passSlot = CapturingSlot<String>()
450 every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
451 every { mockClientSession.auth() } returns succeededAuthFuture
452 val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
453 succeededSessionFuture.value = mockClientSession
454 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
455 every { mockClientSession.waitFor(any(), any()) } returns
456 setOf(ClientSession.ClientSessionEvent.WAIT_AUTH,
457 ClientSession.ClientSessionEvent.CLOSED,
458 ClientSession.ClientSessionEvent.AUTHED)
460 every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem
461 every { mockClientChannel.invertedOut } returns sampleInputStream
462 every { mockClientChannel.invertedIn } returns sampleOutputStream
466 fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() {
467 //after client session connects, make sure the client receives authentication
468 setupOpenChannelMocks()
469 val channelFuture = DefaultOpenFuture(Any(), Any())
470 channelFuture.value = true
471 channelFuture.setOpened()
472 val connectFuture = DefaultConnectFuture(Any(), Any())
473 connectFuture.value = mockClientSession
474 connectFuture.session = mockClientSession
475 every { mockSubsystem.open() } returns channelFuture
476 every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture
478 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
479 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
480 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
481 netconfSessionSpy.setClient(mockSshClient)
483 netconfSessionSpy.connect()
485 verify { mockSubsystem.open() }
490 fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() {
491 assertFailsWith(exceptionClass = NetconfException::class) {
492 //after client session connects, make sure the client receives authentication
493 setupOpenChannelMocks()
494 val channelFuture = DefaultOpenFuture(Any(), Any())
495 every { mockSubsystem.open() } returns channelFuture
496 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
497 every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
498 every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
499 netconfSessionSpy.setClient(mockSshClient)
501 netconfSessionSpy.connect()
503 verify { mockSubsystem.open() }
509 fun `disconnect closes session, channel, and client`() {
510 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
511 every { mockClientSession.close() } just Runs
512 every { mockClientChannel.close() } just Runs
513 every { mockSshClient.close() } just Runs
514 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
515 netconfSessionSpy.setChannel(mockClientChannel)
516 netconfSessionSpy.setClient(mockSshClient)
517 netconfSessionSpy.setSession(mockClientSession)
519 netconfSessionSpy.disconnect()
521 verify { mockClientSession.close() }
522 verify { mockClientChannel.close() }
523 verify { mockSshClient.close() }
527 fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others
528 every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
529 every { mockClientSession.close() } just Runs
530 every { mockClientChannel.close() } throws IOException("channel doesn't want to close!")
531 val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
532 netconfSessionSpy.setChannel(mockClientChannel)
533 netconfSessionSpy.setClient(mockSshClient)
534 netconfSessionSpy.setSession(mockClientSession)
536 netconfSessionSpy.disconnect()
538 verify { mockClientSession.close() }