Merge "Resource edit changes"
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / netconf-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / netconf / executor / core / NetconfSessionImpl.kt
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ClientChannel
23 import org.apache.sshd.client.session.ClientSession
24 import org.apache.sshd.common.FactoryManager
25 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSession
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
35 import org.slf4j.LoggerFactory
36 import java.io.IOException
37 import java.util.*
38 import java.util.concurrent.CompletableFuture
39 import java.util.concurrent.ConcurrentHashMap
40 import java.util.concurrent.ExecutionException
41 import java.util.concurrent.TimeUnit
42 import java.util.concurrent.TimeoutException
43 import java.util.concurrent.atomic.AtomicReference
44
45 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
46     NetconfSession {
47
48     private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
49
50     private val errorReplies: MutableList<String> = Collections.synchronizedList(listOf())
51     private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
52     private val deviceCapabilities = setOf<String>()
53
54     private var connectionTimeout: Long = 0
55     private var replyTimeout: Int = 0
56     private var idleTimeout: Int = 0
57     private var sessionId: String? = null
58
59     private lateinit var session: ClientSession
60     private lateinit var client: SshClient
61     private lateinit var channel: ClientChannel
62     private lateinit var streamHandler: NetconfDeviceCommunicator
63
64     private var capabilities =
65         ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
66
67     override fun connect() {
68         try {
69             log.info("$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
70                     "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}")
71             startConnection()
72             log.info("$deviceInfo: Connected to Netconf Device")
73         } catch (e: NetconfException) {
74             log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
75             throw NetconfException(e)
76         }
77     }
78
79     override fun disconnect() {
80         if (rpcService.closeSession(false).status.equals(
81                 RpcStatus.FAILURE, true)) {
82             rpcService.closeSession(true)
83         }
84
85         session.close()
86         // Closes the socket which should interrupt the streamHandler
87         channel.close()
88         client.close()
89     }
90
91     override fun reconnect() {
92         disconnect()
93         connect()
94     }
95
96     override fun syncRpc(request: String, messageId: String): String {
97         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
98
99         checkAndReestablish()
100
101         try {
102             return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
103 //            replies.remove(messageId)
104         } catch (e: InterruptedException) {
105             Thread.currentThread().interrupt()
106             throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
107         } catch (e: TimeoutException) {
108             throw NetconfException("$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
109                 e)
110         } catch (e: ExecutionException) {
111             log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
112             try {
113                 session.close()
114                 // Closes the socket which should interrupt the streamHandler
115                 channel.close()
116                 client.close()
117             } catch (ioe: IOException) {
118                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
119             }
120
121 //            NetconfReceivedEvent(NetconfReceivedEvent.Type.SESSION_CLOSED, "",
122 //                "Closed due to unexpected error " + e.cause, "-1", deviceInfo)
123             errorReplies.clear() // move to cleanUp()?
124             replies.clear()
125
126             throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
127         }
128     }
129
130     override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
131         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
132
133         checkAndReestablish()
134
135         return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
136             if (t != null) {
137                 throw NetconfException(messageId, t)
138             }
139             reply
140         }
141     }
142
143     override fun checkAndReestablish() {
144         try {
145             if (client.isClosed) {
146                 log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
147                 replies.clear()
148                 startConnection()
149             } else if (session.isClosed) {
150                 log.info("Trying to restart the session with {}", deviceInfo)
151                 replies.clear()
152                 startSession()
153             } else if (channel.isClosed) {
154                 log.info("Trying to reopen the channel with {}", deviceInfo)
155                 replies.clear()
156                 openChannel()
157             } else {
158                 return
159             }
160         } catch (e: IOException) {
161             log.error("Can't reopen connection for device {}", e.message)
162             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
163         } catch (e: IllegalStateException) {
164             log.error("Can't reopen connection for device {}", e.message)
165             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
166         }
167
168     }
169
170     override fun getDeviceInfo(): DeviceInfo {
171         return deviceInfo
172     }
173
174     override fun getSessionId(): String {
175         return this.sessionId!!
176     }
177
178     override fun getDeviceCapabilitiesSet(): Set<String> {
179         return Collections.unmodifiableSet(deviceCapabilities)
180     }
181
182     private fun startConnection() {
183         connectionTimeout = deviceInfo.connectTimeout
184         replyTimeout = deviceInfo.replyTimeout
185         idleTimeout = deviceInfo.idleTimeout
186         try {
187             startClient()
188         } catch (e: Exception) {
189             throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
190         }
191
192     }
193
194     private fun startClient() {
195         client = SshClient.setUpDefaultClient()
196
197         client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
198         client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
199         client.start()
200
201         startSession()
202     }
203
204     private fun startSession() {
205         log.info("$deviceInfo: Starting SSH session")
206         val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
207             .verify(connectionTimeout, TimeUnit.SECONDS)
208         session = connectFuture.session
209         log.info("$deviceInfo: SSH session created")
210
211         authSession()
212     }
213
214     private fun authSession() {
215         session.addPasswordIdentity(deviceInfo.password)
216         session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
217         val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
218             ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
219         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
220             throw NetconfException("$deviceInfo: Failed to authenticate session.")
221         }
222         log.info("$deviceInfo: SSH session authenticated")
223
224         openChannel()
225     }
226
227     private fun openChannel() {
228         channel = session.createSubsystemChannel("netconf")
229         val channelFuture = channel.open()
230         if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
231             log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
232             setupHandler()
233         } else {
234             throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
235         }
236     }
237
238     private fun setupHandler() {
239         val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl()
240         streamHandler = NetconfDeviceCommunicator(channel.invertedOut, channel.invertedIn, deviceInfo,
241             sessionListener, replies)
242
243         exchangeHelloMessage()
244     }
245
246     private fun exchangeHelloMessage() {
247         sessionId = "-1"
248         val messageId = "-1"
249
250         val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
251         val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
252
253         if (sessionIDMatcher.find()) {
254             sessionId = sessionIDMatcher.group(1)
255         } else {
256             throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
257         }
258
259         val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
260         while (capabilityMatcher.find()) {
261             deviceCapabilities.plus(capabilityMatcher.group(1))
262         }
263     }
264
265     inner class NetconfSessionListenerImpl : NetconfSessionListener {
266         override fun notify(event: NetconfReceivedEvent) {
267             val messageId = event.getMessageID()
268
269             when (event.getType()) {
270                 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED -> disconnect()
271                 NetconfReceivedEvent.Type.DEVICE_ERROR -> errorReplies.add(event.getMessagePayload())
272                 NetconfReceivedEvent.Type.DEVICE_REPLY -> replies[messageId]?.complete(event.getMessagePayload())
273                 NetconfReceivedEvent.Type.SESSION_CLOSED -> disconnect()
274             }
275         }
276     }
277
278     fun sessionstatus(state:String): Boolean{
279         return when (state){
280             "Close" -> channel.isClosed
281             "Open" -> channel.isOpen
282             else -> false
283         }
284     }
285 }