netconf-executor: NetconfSessionImplTest
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / netconf-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / netconf / executor / core / NetconfSessionImpl.kt
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ClientChannel
23 import org.apache.sshd.client.session.ClientSession
24 import org.apache.sshd.common.FactoryManager
25 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSession
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
35 import org.slf4j.LoggerFactory
36 import java.io.IOException
37 import java.util.Collections
38 import java.util.concurrent.CompletableFuture
39 import java.util.concurrent.ConcurrentHashMap
40 import java.util.concurrent.ExecutionException
41 import java.util.concurrent.TimeUnit
42 import java.util.concurrent.TimeoutException
43
44 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
45     NetconfSession {
46
47     private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
48
49     private val errorReplies: MutableList<String> = Collections.synchronizedList(mutableListOf())
50     private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
51     private val deviceCapabilities = mutableSetOf<String>()
52
53     private var connectionTimeout: Long = 0
54     private var replyTimeout: Int = 0
55     private var idleTimeout: Int = 0
56     private var sessionId: String? = null
57
58     private lateinit var session: ClientSession
59     private lateinit var client: SshClient
60     private lateinit var channel: ClientChannel
61     private lateinit var streamHandler: NetconfDeviceCommunicator
62
63     private var capabilities =
64         ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
65
66     override fun connect() {
67         try {
68             log.info("$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
69                     "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}")
70             startConnection()
71             log.info("$deviceInfo: Connected to Netconf Device")
72         } catch (e: NetconfException) {
73             log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
74             throw NetconfException(e)
75         }
76     }
77
78     override fun disconnect() {
79         if (rpcService.closeSession(false).status.equals(
80                 RpcStatus.FAILURE, true)) {
81             rpcService.closeSession(true)
82         }
83
84         session.close()
85         // Closes the socket which should interrupt the streamHandler
86         channel.close()
87         client.close()
88     }
89
90     override fun reconnect() {
91         disconnect()
92         connect()
93     }
94
95     override fun syncRpc(request: String, messageId: String): String {
96         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
97
98         checkAndReestablish()
99
100         try {
101             return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
102 //            replies.remove(messageId)
103         } catch (e: InterruptedException) {
104             Thread.currentThread().interrupt()
105             throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
106         } catch (e: TimeoutException) {
107             throw NetconfException("$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
108                 e)
109         } catch (e: ExecutionException) {
110             log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
111             try {
112                 session.close()
113                 // Closes the socket which should interrupt the streamHandler
114                 channel.close()
115                 client.close()
116             } catch (ioe: IOException) {
117                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
118             }
119             clearErrorReplies()
120             clearReplies()
121
122             throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
123         }
124     }
125
126     override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
127         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
128
129         checkAndReestablish()
130
131         return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
132             if (t != null) {
133                 throw NetconfException(messageId, t)
134             }
135             reply
136         }
137     }
138
139     override fun checkAndReestablish() {
140         try {
141             if (client.isClosed) {
142                 log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
143                 clearReplies()
144                 startConnection()
145             } else if (session.isClosed) {
146                 log.info("Trying to restart the session with {}", deviceInfo)
147                 clearReplies()
148                 startSession()
149             } else if (channel.isClosed) {
150                 log.info("Trying to reopen the channel with {}", deviceInfo)
151                 clearReplies()
152                 openChannel()
153             } else {
154                 return
155             }
156         } catch (e: IOException) {
157             log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
158             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
159         } catch (e: IllegalStateException) {
160             log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
161             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
162         }
163     }
164
165     override fun getDeviceInfo(): DeviceInfo {
166         return deviceInfo
167     }
168
169     override fun getSessionId(): String {
170         return this.sessionId!!
171     }
172
173     override fun getDeviceCapabilitiesSet(): Set<String> {
174         return Collections.unmodifiableSet(deviceCapabilities)
175     }
176
177     private fun startConnection() {
178         connectionTimeout = deviceInfo.connectTimeout
179         replyTimeout = deviceInfo.replyTimeout
180         idleTimeout = deviceInfo.idleTimeout
181         try {
182             startClient()
183         } catch (e: Exception) {
184             throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
185         }
186
187     }
188
189     //Needed to unit test connect method interacting with client.start in startClient() below
190     private fun setupNewSSHClient() {
191         client = SshClient.setUpDefaultClient()
192     }
193
194     private fun startClient() {
195         setupNewSSHClient()
196
197         client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
198         client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
199         client.start()
200
201         startSession()
202     }
203
204     private fun startSession() {
205         log.info("$deviceInfo: Starting SSH session")
206         val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
207             .verify(connectionTimeout, TimeUnit.SECONDS)
208         session = connectFuture.session
209         log.info("$deviceInfo: SSH session created")
210
211         authSession()
212     }
213
214     private fun authSession() {
215         session.addPasswordIdentity(deviceInfo.password)
216         session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
217         val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
218             ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
219         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
220             throw NetconfException("$deviceInfo: Failed to authenticate session.")
221         }
222         log.info("$deviceInfo: SSH session authenticated")
223
224         openChannel()
225     }
226
227     private fun openChannel() {
228         channel = session.createSubsystemChannel("netconf")
229         val channelFuture = channel.open()
230         if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
231             log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
232             setupHandler()
233         } else {
234             throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
235         }
236     }
237
238     private fun setupHandler() {
239         val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl(this)
240         streamHandler = NetconfDeviceCommunicator(channel.invertedOut, channel.invertedIn, deviceInfo,
241             sessionListener, replies)
242
243         exchangeHelloMessage()
244     }
245
246     private fun exchangeHelloMessage() {
247         sessionId = "-1"
248         val messageId = "-1"
249
250         val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
251         val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
252
253         if (sessionIDMatcher.find()) {
254             sessionId = sessionIDMatcher.group(1)
255         } else {
256             throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
257         }
258
259         val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
260         while (capabilityMatcher.find()) {
261             deviceCapabilities.plus(capabilityMatcher.group(1))
262         }
263     }
264
265     fun sessionstatus(state:String): Boolean{
266         return when (state){
267             "Close" -> channel.isClosed
268             "Open" -> channel.isOpen
269             else -> false
270         }
271     }
272
273     internal fun setStreamHandler(streamHandler: NetconfDeviceCommunicator) {
274         this.streamHandler = streamHandler
275     }
276
277     /**
278      * Add an error reply
279      * Used by {@link NetconfSessionListenerImpl}
280      */
281     internal fun addDeviceErrorReply(errReply: String) {
282         println("addDeviceErrorReply (errReply: $errReply") //TODO : get rid of this.
283         errorReplies.add(errReply)
284     }
285
286     /**
287      * Add a reply from the device
288      * Used by {@link NetconfSessionListenerImpl}
289      */
290     internal fun addDeviceReply(messageId: String, replyMsg: String) {
291         println("addDeviceReply (messageId: $messageId replyMsg: $replyMsg") //TODO : get rid of this.
292         replies[messageId]?.complete(replyMsg)
293     }
294
295     /**
296      * Internal function for accessing replies for testing.
297      */
298     internal fun getReplies() = replies
299
300     /**
301      * internal function for accessing errorReplies for testing.
302      */
303     internal fun getErrorReplies() = errorReplies
304
305     internal fun clearErrorReplies() = errorReplies.clear()
306     internal fun clearReplies() = replies.clear()
307     internal fun setClient(client: SshClient) { this.client = client }
308     internal fun setSession(session: ClientSession) { this.session = session }
309     internal fun setChannel(channel: ClientChannel) { this.channel = channel }
310 }