cf11cfdde161f148bfb81a9c95bfb952af2a0dfe
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.core
18
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ClientChannel
23 import org.apache.sshd.client.session.ClientSession
24 import org.apache.sshd.common.FactoryManager
25 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
26 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
27 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfException
28 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
29 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
30 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfSession
31 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
32 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
33 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
34 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
35 import org.slf4j.LoggerFactory
36 import java.io.IOException
37 import java.util.*
38 import java.util.concurrent.CompletableFuture
39 import java.util.concurrent.ConcurrentHashMap
40 import java.util.concurrent.ExecutionException
41 import java.util.concurrent.TimeUnit
42 import java.util.concurrent.TimeoutException
43
44 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
45     NetconfSession {
46
47     private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
48
49     private val errorReplies: MutableList<String> = Collections.synchronizedList(listOf())
50     private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
51     private val deviceCapabilities = setOf<String>()
52
53     private var connectionTimeout: Long = 0
54     private var replyTimeout: Int = 0
55     private var idleTimeout: Int = 0
56     private var sessionId: String? = null
57
58     private lateinit var session: ClientSession
59     private lateinit var client: SshClient
60     private lateinit var channel: ClientChannel
61     private lateinit var streamHandler: NetconfDeviceCommunicator
62
63     private var capabilities =
64         ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
65
66     override fun connect() {
67         try {
68             log.info("$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
69                     "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}")
70             startConnection()
71             log.info("$deviceInfo: Connected to Netconf Device")
72         } catch (e: NetconfException) {
73             log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
74             throw NetconfException(e)
75         }
76     }
77
78     override fun disconnect() {
79         if (rpcService.closeSession(false).status.equals(
80                 RpcStatus.FAILURE, true)) {
81             rpcService.closeSession(true)
82         }
83
84         session.close()
85         // Closes the socket which should interrupt the streamHandler
86         channel.close()
87         client.close()
88     }
89
90     override fun reconnect() {
91         disconnect()
92         connect()
93     }
94
95     override fun syncRpc(request: String, messageId: String): String {
96         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
97
98         checkAndReestablish()
99
100         try {
101             return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
102 //            replies.remove(messageId)
103         } catch (e: InterruptedException) {
104             Thread.currentThread().interrupt()
105             throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
106         } catch (e: TimeoutException) {
107             throw NetconfException("$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
108                 e)
109         } catch (e: ExecutionException) {
110             log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
111             try {
112                 session.close()
113                 // Closes the socket which should interrupt the streamHandler
114                 channel.close()
115                 client.close()
116             } catch (ioe: IOException) {
117                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
118             }
119
120 //            NetconfReceivedEvent(NetconfReceivedEvent.Type.SESSION_CLOSED, "",
121 //                "Closed due to unexpected error " + e.cause, "-1", deviceInfo)
122             errorReplies.clear() // move to cleanUp()?
123             replies.clear()
124
125             throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
126         }
127     }
128
129     override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
130         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
131
132         checkAndReestablish()
133
134         return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
135             if (t != null) {
136                 throw NetconfException(messageId, t)
137             }
138             reply
139         }
140     }
141
142     override fun checkAndReestablish() {
143         try {
144             if (client.isClosed) {
145                 log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
146                 replies.clear()
147                 startConnection()
148             } else if (session.isClosed) {
149                 log.info("Trying to restart the session with {}", deviceInfo)
150                 replies.clear()
151                 startSession()
152             } else if (channel.isClosed) {
153                 log.info("Trying to reopen the channel with {}", deviceInfo)
154                 replies.clear()
155                 openChannel()
156             } else {
157                 return
158             }
159         } catch (e: IOException) {
160             log.error("Can't reopen connection for device {}", e.message)
161             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
162         } catch (e: IllegalStateException) {
163             log.error("Can't reopen connection for device {}", e.message)
164             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
165         }
166
167     }
168
169     override fun getDeviceInfo(): DeviceInfo {
170         return deviceInfo
171     }
172
173     override fun getSessionId(): String {
174         return this.sessionId!!
175     }
176
177     override fun getDeviceCapabilitiesSet(): Set<String> {
178         return Collections.unmodifiableSet(deviceCapabilities)
179     }
180
181     private fun startConnection() {
182         connectionTimeout = deviceInfo.connectTimeout
183         replyTimeout = deviceInfo.replyTimeout
184         idleTimeout = deviceInfo.idleTimeout
185         try {
186             startClient()
187         } catch (e: Exception) {
188             throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
189         }
190
191     }
192
193     private fun startClient() {
194         client = SshClient.setUpDefaultClient()
195         client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
196         client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
197         client.keyPairProvider = SimpleGeneratorHostKeyProvider()
198         client.start()
199
200         startSession()
201     }
202
203     private fun startSession() {
204         log.info("$deviceInfo: Starting SSH session")
205         val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
206             .verify(connectionTimeout, TimeUnit.SECONDS)
207         session = connectFuture.session
208         log.info("$deviceInfo: SSH session created")
209
210         authSession()
211     }
212
213     private fun authSession() {
214         session.addPasswordIdentity(deviceInfo.password)
215         session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
216         val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
217             ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
218         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
219             throw NetconfException("$deviceInfo: Failed to authenticate session.")
220         }
221         log.info("$deviceInfo: SSH session authenticated")
222
223         openChannel()
224     }
225
226     private fun openChannel() {
227         channel = session.createSubsystemChannel("netconf")
228         val channelFuture = channel.open()
229         if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
230             log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
231             setupHandler()
232         } else {
233             throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
234         }
235     }
236
237     private fun setupHandler() {
238         val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl()
239         streamHandler = NetconfDeviceCommunicator(channel.invertedOut, channel.invertedIn, deviceInfo,
240             sessionListener, replies)
241
242         exchangeHelloMessage()
243     }
244
245     private fun exchangeHelloMessage() {
246         sessionId = "-1"
247         val messageId = "-1"
248
249         val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
250         val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
251
252         if (sessionIDMatcher.find()) {
253             sessionId = sessionIDMatcher.group(1)
254         } else {
255             throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
256         }
257
258         val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
259         while (capabilityMatcher.find()) {
260             deviceCapabilities.plus(capabilityMatcher.group(1))
261         }
262     }
263
264     inner class NetconfSessionListenerImpl : NetconfSessionListener {
265         override fun notify(event: NetconfReceivedEvent) {
266             val messageId = event.getMessageID()
267
268             when (event.getType()) {
269                 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED -> disconnect()
270                 NetconfReceivedEvent.Type.DEVICE_ERROR -> errorReplies.add(event.getMessagePayload())
271                 NetconfReceivedEvent.Type.DEVICE_REPLY -> replies[messageId]?.complete(event.getMessagePayload())
272                 NetconfReceivedEvent.Type.SESSION_CLOSED -> disconnect()
273             }
274         }
275     }
276 }