netconf-executor: NetconfSessionImplTest improvements 88/84988/4
authorOleg Mitsura <oleg.mitsura@amdocs.com>
Thu, 11 Apr 2019 02:32:11 +0000 (22:32 -0400)
committerOleg Mitsura <oleg.mitsura@amdocs.com>
Thu, 11 Apr 2019 12:05:11 +0000 (08:05 -0400)
Issue-ID: CCSDK-1126

Change-Id: Ied0360a37f8f22801c63c2aeb70ee73d45cc7b4b
Signed-off-by: Oleg Mitsura <oleg.mitsura@amdocs.com>
ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt

index 6ef4f41..aa156e2 100644 (file)
@@ -57,7 +57,7 @@ class NetconfDeviceCommunicator(private var inputStream: InputStream,
             while (!socketClosed) {
                 val cInt = bufferReader.read()
                 if (cInt == -1) {
-                    log.error("$deviceInfo: Received cInt = -1")
+                    log.debug("$deviceInfo: Received end of stream, closing socket.")
                     socketClosed = true
                 }
                 val c = cInt.toChar()
index 7e56e3e..13c1bfa 100644 (file)
@@ -80,11 +80,11 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
                 RpcStatus.FAILURE, true)) {
             rpcService.closeSession(true)
         }
-
-        session.close()
-        // Closes the socket which should interrupt the streamHandler
-        channel.close()
-        client.close()
+        try {
+            close()
+        } catch (ioe: IOException) {
+            log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
+        }
     }
 
     override fun reconnect() {
@@ -98,8 +98,8 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
         checkAndReestablish()
 
         try {
-            return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
-//            replies.remove(messageId)
+            return streamHandler.getFutureFromSendMessage(streamHandler.sendMessage(formattedRequest, messageId),
+                replyTimeout.toLong(), TimeUnit.SECONDS)
         } catch (e: InterruptedException) {
             Thread.currentThread().interrupt()
             throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
@@ -109,10 +109,7 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
         } catch (e: ExecutionException) {
             log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
             try {
-                session.close()
-                // Closes the socket which should interrupt the streamHandler
-                channel.close()
-                client.close()
+                close()
             } catch (ioe: IOException) {
                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
             }
@@ -138,20 +135,23 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
 
     override fun checkAndReestablish() {
         try {
-            if (client.isClosed) {
-                log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
-                clearReplies()
-                startConnection()
-            } else if (session.isClosed) {
-                log.info("Trying to restart the session with {}", deviceInfo)
-                clearReplies()
-                startSession()
-            } else if (channel.isClosed) {
-                log.info("Trying to reopen the channel with {}", deviceInfo)
-                clearReplies()
-                openChannel()
-            } else {
-                return
+            when {
+                client.isClosed -> {
+                    log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
+                    clearReplies()
+                    startConnection()
+                }
+                session.isClosed -> {
+                    log.info("Trying to restart the session with {}", deviceInfo)
+                    clearReplies()
+                    startSession()
+                }
+                channel.isClosed -> {
+                    log.info("Trying to reopen the channel with {}", deviceInfo)
+                    clearReplies()
+                    openChannel()
+                }
+                else -> return
             }
         } catch (e: IOException) {
             log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
@@ -258,7 +258,7 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
 
         val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
         while (capabilityMatcher.find()) {
-            deviceCapabilities.plus(capabilityMatcher.group(1))
+            deviceCapabilities.add(capabilityMatcher.group(1))
         }
     }
 
@@ -292,6 +292,17 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
         replies[messageId]?.complete(replyMsg)
     }
 
+    /**
+     * Closes the session/channel/client
+     */
+    @Throws(IOException::class)
+    private fun close() {
+        session.close()
+        // Closes the socket which should interrupt the streamHandler
+        channel.close()
+        client.close()
+    }
+
     /**
      * Internal function for accessing replies for testing.
      */
index 1f526f4..f5fd541 100644 (file)
@@ -168,7 +168,6 @@ class NetconfSessionImplTest {
 
     }
 
-    @Ignore //TODO undo close method removal
     @Test
     fun `disconnect wraps exception from ssh closing error`() {
         val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
@@ -295,7 +294,6 @@ class NetconfSessionImplTest {
         }
     }
 
-    @Ignore //TODO revert back on getFutureFromSendMessage
     @Test
     fun `syncRpc throws NetconfException if TimeoutException is caught`() {
         val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
@@ -310,23 +308,23 @@ class NetconfSessionImplTest {
         }
     }
 
-    @Ignore
     @Test
     fun `syncRpc throws NetconfException if ExecutionException is caught`() {
         val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
         assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
-            val netconfSessionSpy = spyk(netconfSession)
+            val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = false)
             val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
             every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
             every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
-                ExecutionException("exec exception", Exception("nested exception")) //TODO revert getFutureFromSendMessage back
+                ExecutionException("exec exception", Exception("nested exception"))
+            every { netconfSessionSpy["close"]() as Unit } just Runs
             every { netconfSessionSpy.checkAndReestablish() } just Runs
+            netconfSessionSpy.setSession(mockClientSession)
             //call the method
             netconfSessionSpy.syncRpc("0", "0")
         }
     }
 
-    @Ignore //TODO revert back on getFutureFromSendMessage
     @Test
     fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
         val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
@@ -525,7 +523,6 @@ class NetconfSessionImplTest {
         verify { mockSshClient.close() }
     }
 
-    @Ignore
     @Test
     fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others
         every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE