21570a2355855bc6de87d5cc1778bd2c62c37b08
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.core
18
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ClientChannel
23 import org.apache.sshd.client.session.ClientSession
24 import org.apache.sshd.common.FactoryManager
25 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
26 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
27 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfException
28 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
29 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
30 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfSession
31 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
32 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
33 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
34 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
35 import org.slf4j.LoggerFactory
36 import java.io.IOException
37 import java.util.*
38 import java.util.concurrent.CompletableFuture
39 import java.util.concurrent.ConcurrentHashMap
40 import java.util.concurrent.ExecutionException
41 import java.util.concurrent.TimeUnit
42 import java.util.concurrent.TimeoutException
43 import java.util.concurrent.atomic.AtomicInteger
44
45 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
46     NetconfSession {
47
48     private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
49
50     private val errorReplies: MutableList<String> = Collections.synchronizedList(listOf())
51     private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
52     private val deviceCapabilities = setOf<String>()
53
54     private var connectionTimeout: Long = 0
55     private var replyTimeout: Int = 0
56     private var idleTimeout: Int = 0
57     private var sessionId: String? = null
58
59     private lateinit var session: ClientSession
60     private lateinit var client: SshClient
61     private lateinit var channel: ClientChannel
62     private lateinit var streamHandler: NetconfDeviceCommunicator
63
64     private val messageIdInteger = AtomicInteger(1)
65     private var capabilities =
66         ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
67
68     override fun connect() {
69         try {
70             log.info("$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
71                     "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}")
72             startConnection()
73             log.info("$deviceInfo: Connected to Netconf Device")
74         } catch (e: NetconfException) {
75             log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
76             throw NetconfException(e)
77         }
78     }
79
80     override fun disconnect() {
81         if (rpcService.closeSession(messageIdInteger.incrementAndGet().toString(), false, replyTimeout).status.equals(
82                 RpcStatus.FAILURE, true)) {
83             rpcService.closeSession(messageIdInteger.incrementAndGet().toString(), true, replyTimeout)
84         }
85
86         session.close()
87         // Closes the socket which should interrupt the streamHandler
88         channel.close()
89         client.close()
90     }
91
92     override fun reconnect() {
93         disconnect()
94         connect()
95     }
96
97     override fun syncRpc(request: String, messageId: String): String {
98         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
99
100         checkAndReestablish()
101
102         try {
103             return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
104 //            replies.remove(messageId)
105         } catch (e: InterruptedException) {
106             Thread.currentThread().interrupt()
107             throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
108         } catch (e: TimeoutException) {
109             throw NetconfException("$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
110                 e)
111         } catch (e: ExecutionException) {
112             log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
113             try {
114                 session.close()
115                 // Closes the socket which should interrupt the streamHandler
116                 channel.close()
117                 client.close()
118             } catch (ioe: IOException) {
119                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
120             }
121
122 //            NetconfReceivedEvent(NetconfReceivedEvent.Type.SESSION_CLOSED, "",
123 //                "Closed due to unexpected error " + e.cause, "-1", deviceInfo)
124             errorReplies.clear() // move to cleanUp()?
125             replies.clear()
126
127             throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
128         }
129     }
130
131     override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
132         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
133
134         checkAndReestablish()
135
136         return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
137             if (t != null) {
138                 throw NetconfException(messageId, t)
139             }
140             reply
141         }
142     }
143
144     override fun checkAndReestablish() {
145         try {
146             if (client.isClosed) {
147                 log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
148                 replies.clear()
149                 startConnection()
150             } else if (session.isClosed) {
151                 log.info("Trying to restart the session with {}", deviceInfo)
152                 replies.clear()
153                 startSession()
154             } else if (channel.isClosed) {
155                 log.info("Trying to reopen the channel with {}", deviceInfo)
156                 replies.clear()
157                 openChannel()
158             } else {
159                 return
160             }
161         } catch (e: IOException) {
162             log.error("Can't reopen connection for device {}", e.message)
163             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
164         } catch (e: IllegalStateException) {
165             log.error("Can't reopen connection for device {}", e.message)
166             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
167         }
168
169     }
170
171     override fun getDeviceInfo(): DeviceInfo {
172         return deviceInfo
173     }
174
175     override fun getSessionId(): String {
176         return this.sessionId!!
177     }
178
179     override fun getDeviceCapabilitiesSet(): Set<String> {
180         return Collections.unmodifiableSet(deviceCapabilities)
181     }
182
183     private fun startConnection() {
184         connectionTimeout = deviceInfo.connectTimeout
185         replyTimeout = deviceInfo.replyTimeout
186         idleTimeout = deviceInfo.idleTimeout
187         try {
188             startClient()
189         } catch (e: Exception) {
190             throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
191         }
192
193     }
194
195     private fun startClient() {
196         client = SshClient.setUpDefaultClient()
197         client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
198         client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
199         client.keyPairProvider = SimpleGeneratorHostKeyProvider()
200         client.start()
201
202         startSession()
203     }
204
205     private fun startSession() {
206         log.info("$deviceInfo: Starting SSH session")
207         val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
208             .verify(connectionTimeout, TimeUnit.SECONDS)
209         session = connectFuture.session
210         log.info("$deviceInfo: SSH session created")
211
212         authSession()
213     }
214
215     private fun authSession() {
216         session.addPasswordIdentity(deviceInfo.password)
217         session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
218         val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
219             ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
220         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
221             throw NetconfException("$deviceInfo: Failed to authenticate session.")
222         }
223         log.info("$deviceInfo: SSH session authenticated")
224
225         openChannel()
226     }
227
228     private fun openChannel() {
229         channel = session.createSubsystemChannel("netconf")
230         val channelFuture = channel.open()
231         if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
232             log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
233             setupHandler()
234         } else {
235             throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
236         }
237     }
238
239     private fun setupHandler() {
240         val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl()
241         streamHandler = NetconfDeviceCommunicator(channel.invertedOut, channel.invertedIn, deviceInfo,
242             sessionListener, replies)
243
244         exchangeHelloMessage()
245     }
246
247     private fun exchangeHelloMessage() {
248         sessionId = "-1"
249         val messageId = "-1"
250
251         val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
252         val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
253
254         if (sessionIDMatcher.find()) {
255             sessionId = sessionIDMatcher.group(1)
256         } else {
257             throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
258         }
259
260         val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
261         while (capabilityMatcher.find()) {
262             deviceCapabilities.plus(capabilityMatcher.group(1))
263         }
264     }
265
266     inner class NetconfSessionListenerImpl : NetconfSessionListener {
267         override fun notify(event: NetconfReceivedEvent) {
268             val messageId = event.getMessageID()
269
270             when (event.getType()) {
271                 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED -> disconnect()
272                 NetconfReceivedEvent.Type.DEVICE_ERROR -> errorReplies.add(event.getMessagePayload())
273                 NetconfReceivedEvent.Type.DEVICE_REPLY -> replies[messageId]?.complete(event.getMessagePayload())
274                 NetconfReceivedEvent.Type.SESSION_CLOSED -> disconnect()
275             }
276         }
277     }
278 }