dac977bdd7f9988536c0d4019f2cdb7f1089570a
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ClientChannel
23 import org.apache.sshd.client.session.ClientSession
24 import org.apache.sshd.common.FactoryManager
25 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSession
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
35 import org.slf4j.LoggerFactory
36 import java.io.IOException
37 import java.util.Collections
38 import java.util.concurrent.CompletableFuture
39 import java.util.concurrent.ConcurrentHashMap
40 import java.util.concurrent.ExecutionException
41 import java.util.concurrent.TimeUnit
42 import java.util.concurrent.TimeoutException
43
44 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
45     NetconfSession {
46
47     private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
48
49     private val errorReplies: MutableList<String> = Collections.synchronizedList(mutableListOf())
50     private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
51     private val deviceCapabilities = mutableSetOf<String>()
52
53     private var connectionTimeout: Long = 0
54     private var replyTimeout: Int = 0
55     private var idleTimeout: Int = 0
56     private var sessionId: String? = null
57
58     private lateinit var session: ClientSession
59     private lateinit var client: SshClient
60     private lateinit var channel: ClientChannel
61     private lateinit var streamHandler: NetconfDeviceCommunicator
62
63     private var capabilities =
64         ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
65
66     override fun connect() {
67         try {
68             log.info("$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
69                     "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}")
70             startConnection()
71             log.info("$deviceInfo: Connected to Netconf Device")
72         } catch (e: NetconfException) {
73             log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
74             throw NetconfException(e)
75         }
76     }
77
78     override fun disconnect() {
79         var retryNum = 3
80         while(rpcService.closeSession(false).status
81                 .equals(RpcStatus.FAILURE, true) &&retryNum>0) {
82             log.error("disconnect: graceful disconnect failed, attempt $retryNum")
83             retryNum--;
84         }
85         //if we can't close the session, try to force terminate.
86         if(retryNum == 0) {
87             log.error("disconnect: trying to force-terminate the session.")
88             rpcService.closeSession(true)
89         }
90         try {
91             close()
92         } catch (ioe: IOException) {
93             log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
94         }
95     }
96
97     override fun reconnect() {
98         disconnect()
99         connect()
100     }
101
102     override fun syncRpc(request: String, messageId: String): String {
103         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
104
105         checkAndReestablish()
106
107         try {
108             return streamHandler.getFutureFromSendMessage(streamHandler.sendMessage(formattedRequest, messageId),
109                 replyTimeout.toLong(), TimeUnit.SECONDS)
110         } catch (e: InterruptedException) {
111             throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
112         } catch (e: TimeoutException) {
113             throw NetconfException("$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
114                 e)
115         } catch (e: ExecutionException) {
116             log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
117             try {
118                 close()
119             } catch (ioe: IOException) {
120                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
121             }
122             clearErrorReplies()
123             clearReplies()
124
125             throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
126         }
127     }
128
129     override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
130         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
131
132         checkAndReestablish()
133
134         return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
135             if (t != null) {
136                 throw NetconfException(messageId, t)
137             }
138             reply
139         }
140     }
141
142     override fun checkAndReestablish() {
143         try {
144             when {
145                 client.isClosed -> {
146                     log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
147                     clearReplies()
148                     startConnection()
149                 }
150                 session.isClosed -> {
151                     log.info("Trying to restart the session with {}", deviceInfo)
152                     clearReplies()
153                     startSession()
154                 }
155                 channel.isClosed -> {
156                     log.info("Trying to reopen the channel with {}", deviceInfo)
157                     clearReplies()
158                     openChannel()
159                 }
160                 else -> return
161             }
162         } catch (e: IOException) {
163             log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
164             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
165         } catch (e: IllegalStateException) {
166             log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
167             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
168         }
169     }
170
171     override fun getDeviceInfo(): DeviceInfo {
172         return deviceInfo
173     }
174
175     override fun getSessionId(): String {
176         return this.sessionId!!
177     }
178
179     override fun getDeviceCapabilitiesSet(): Set<String> {
180         return Collections.unmodifiableSet(deviceCapabilities)
181     }
182
183     private fun startConnection() {
184         connectionTimeout = deviceInfo.connectTimeout
185         replyTimeout = deviceInfo.replyTimeout
186         idleTimeout = deviceInfo.idleTimeout
187         try {
188             startClient()
189         } catch (e: Exception) {
190             throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
191         }
192
193     }
194
195     //Needed to unit test connect method interacting with client.start in startClient() below
196     private fun setupNewSSHClient() {
197         client = SshClient.setUpDefaultClient()
198     }
199
200     private fun startClient() {
201         setupNewSSHClient()
202
203         client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
204         client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
205         client.start()
206
207         startSession()
208     }
209
210     private fun startSession() {
211         log.info("$deviceInfo: Starting SSH session")
212         val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
213             .verify(connectionTimeout, TimeUnit.SECONDS)
214         session = connectFuture.session
215         log.info("$deviceInfo: SSH session created")
216
217         authSession()
218     }
219
220     private fun authSession() {
221         session.addPasswordIdentity(deviceInfo.password)
222         session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
223         val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
224             ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
225         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
226             throw NetconfException("$deviceInfo: Failed to authenticate session.")
227         }
228         log.info("$deviceInfo: SSH session authenticated")
229
230         openChannel()
231     }
232
233     private fun openChannel() {
234         channel = session.createSubsystemChannel("netconf")
235         val channelFuture = channel.open()
236         if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
237             log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
238             setupHandler()
239         } else {
240             throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
241         }
242     }
243
244     private fun setupHandler() {
245         val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl(this)
246         streamHandler = NetconfDeviceCommunicator(channel.invertedOut, channel.invertedIn, deviceInfo,
247             sessionListener, replies)
248
249         exchangeHelloMessage()
250     }
251
252     private fun exchangeHelloMessage() {
253         sessionId = "-1"
254         val messageId = "-1"
255
256         val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
257         val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
258
259         if (sessionIDMatcher.find()) {
260             sessionId = sessionIDMatcher.group(1)
261             log.info("netconf exchangeHelloMessage sessionID: $sessionId")
262         } else {
263             throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
264         }
265
266         val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
267         while (capabilityMatcher.find()) { //TODO: refactor to add unit test easily for device capability accumulation.
268             deviceCapabilities.add(capabilityMatcher.group(1))
269         }
270     }
271
272     internal fun setStreamHandler(streamHandler: NetconfDeviceCommunicator) {
273         this.streamHandler = streamHandler
274     }
275
276     /**
277      * Add an error reply
278      * Used by {@link NetconfSessionListenerImpl}
279      */
280     internal fun addDeviceErrorReply(errReply: String) {
281         errorReplies.add(errReply)
282     }
283
284     /**
285      * Add a reply from the device
286      * Used by {@link NetconfSessionListenerImpl}
287      */
288     internal fun addDeviceReply(messageId: String, replyMsg: String) {
289         replies[messageId]?.complete(replyMsg)
290     }
291
292     /**
293      * Closes the session/channel/client
294      */
295     @Throws(IOException::class)
296     private fun close() {
297         log.debug("close was called.")
298         session.close()
299         // Closes the socket which should interrupt the streamHandler
300         channel.close()
301         client.close()
302     }
303
304     /**
305      * Internal function for accessing replies for testing.
306      */
307     internal fun getReplies() = replies
308
309     /**
310      * internal function for accessing errorReplies for testing.
311      */
312     internal fun getErrorReplies() = errorReplies
313     internal fun clearErrorReplies() = errorReplies.clear()
314     internal fun clearReplies() = replies.clear()
315     internal fun setClient(client: SshClient) { this.client = client }
316     internal fun setSession(session: ClientSession) { this.session = session }
317     internal fun setChannel(channel: ClientChannel) { this.channel = channel }
318 }