RpcStatus.FAILURE, true)) {
rpcService.closeSession(true)
}
-
- session.close()
- // Closes the socket which should interrupt the streamHandler
- channel.close()
- client.close()
+ try {
+ close()
+ } catch (ioe: IOException) {
+ log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
+ }
}
override fun reconnect() {
checkAndReestablish()
try {
- return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
-// replies.remove(messageId)
+ return streamHandler.getFutureFromSendMessage(streamHandler.sendMessage(formattedRequest, messageId),
+ replyTimeout.toLong(), TimeUnit.SECONDS)
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
} catch (e: ExecutionException) {
log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
try {
- session.close()
- // Closes the socket which should interrupt the streamHandler
- channel.close()
- client.close()
+ close()
} catch (ioe: IOException) {
log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
}
override fun checkAndReestablish() {
try {
- if (client.isClosed) {
- log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
- clearReplies()
- startConnection()
- } else if (session.isClosed) {
- log.info("Trying to restart the session with {}", deviceInfo)
- clearReplies()
- startSession()
- } else if (channel.isClosed) {
- log.info("Trying to reopen the channel with {}", deviceInfo)
- clearReplies()
- openChannel()
- } else {
- return
+ when {
+ client.isClosed -> {
+ log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
+ clearReplies()
+ startConnection()
+ }
+ session.isClosed -> {
+ log.info("Trying to restart the session with {}", deviceInfo)
+ clearReplies()
+ startSession()
+ }
+ channel.isClosed -> {
+ log.info("Trying to reopen the channel with {}", deviceInfo)
+ clearReplies()
+ openChannel()
+ }
+ else -> return
}
} catch (e: IOException) {
log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
while (capabilityMatcher.find()) {
- deviceCapabilities.plus(capabilityMatcher.group(1))
+ deviceCapabilities.add(capabilityMatcher.group(1))
}
}
replies[messageId]?.complete(replyMsg)
}
+ /**
+ * Closes the session/channel/client
+ */
+ @Throws(IOException::class)
+ private fun close() {
+ session.close()
+ // Closes the socket which should interrupt the streamHandler
+ channel.close()
+ client.close()
+ }
+
/**
* Internal function for accessing replies for testing.
*/
}
- @Ignore //TODO undo close method removal
@Test
fun `disconnect wraps exception from ssh closing error`() {
val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
}
}
- @Ignore //TODO revert back on getFutureFromSendMessage
@Test
fun `syncRpc throws NetconfException if TimeoutException is caught`() {
val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
}
}
- @Ignore
@Test
fun `syncRpc throws NetconfException if ExecutionException is caught`() {
val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
- val netconfSessionSpy = spyk(netconfSession)
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = false)
val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
- ExecutionException("exec exception", Exception("nested exception")) //TODO revert getFutureFromSendMessage back
+ ExecutionException("exec exception", Exception("nested exception"))
+ every { netconfSessionSpy["close"]() as Unit } just Runs
every { netconfSessionSpy.checkAndReestablish() } just Runs
+ netconfSessionSpy.setSession(mockClientSession)
//call the method
netconfSessionSpy.syncRpc("0", "0")
}
}
- @Ignore //TODO revert back on getFutureFromSendMessage
@Test
fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
verify { mockSshClient.close() }
}
- @Ignore
@Test
fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others
every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE