Migrate "ms/controllerblueprints" from ccsdk/apps
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / netconf-executor / src / main / kotlin / org / onap / ccsdk / apps / blueprintsprocessor / functions / netconf / executor / core / NetconfSessionImpl.kt
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.core
18
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ChannelSubsystem
23 import org.apache.sshd.client.channel.ClientChannel
24 import org.apache.sshd.client.session.ClientSession
25 import org.apache.sshd.client.session.ClientSessionImpl
26 import org.apache.sshd.common.FactoryManager
27 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
28 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
29 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfException
30 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
31 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
32 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfSession
33 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
34 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
35 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
36 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
37 import org.slf4j.LoggerFactory
38 import java.io.IOException
39 import java.util.*
40 import java.util.concurrent.CompletableFuture
41 import java.util.concurrent.ConcurrentHashMap
42 import java.util.concurrent.ExecutionException
43 import java.util.concurrent.TimeUnit
44 import java.util.concurrent.TimeoutException
45 import java.util.concurrent.atomic.AtomicReference
46
47 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
48     NetconfSession {
49
50     private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
51
52     private val errorReplies: MutableList<String> = Collections.synchronizedList(listOf())
53     private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
54     private val deviceCapabilities = setOf<String>()
55
56     private var connectionTimeout: Long = 0
57     private var replyTimeout: Int = 0
58     private var idleTimeout: Int = 0
59     private var sessionId: String? = null
60
61     private lateinit var session: ClientSession
62     private lateinit var client: SshClient
63     private lateinit var channel: ClientChannel
64     private lateinit var streamHandler: NetconfDeviceCommunicator
65
66     private var capabilities =
67         ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
68
69     override fun connect() {
70         try {
71             log.info("$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
72                     "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}")
73             startConnection()
74             log.info("$deviceInfo: Connected to Netconf Device")
75         } catch (e: NetconfException) {
76             log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
77             throw NetconfException(e)
78         }
79     }
80
81     override fun disconnect() {
82         if (rpcService.closeSession(false).status.equals(
83                 RpcStatus.FAILURE, true)) {
84             rpcService.closeSession(true)
85         }
86
87         session.close()
88         // Closes the socket which should interrupt the streamHandler
89         channel.close()
90         client.close()
91     }
92
93     override fun reconnect() {
94         disconnect()
95         connect()
96     }
97
98     override fun syncRpc(request: String, messageId: String): String {
99         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
100
101         checkAndReestablish()
102
103         try {
104             return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
105 //            replies.remove(messageId)
106         } catch (e: InterruptedException) {
107             Thread.currentThread().interrupt()
108             throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
109         } catch (e: TimeoutException) {
110             throw NetconfException("$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
111                 e)
112         } catch (e: ExecutionException) {
113             log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
114             try {
115                 session.close()
116                 // Closes the socket which should interrupt the streamHandler
117                 channel.close()
118                 client.close()
119             } catch (ioe: IOException) {
120                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
121             }
122
123 //            NetconfReceivedEvent(NetconfReceivedEvent.Type.SESSION_CLOSED, "",
124 //                "Closed due to unexpected error " + e.cause, "-1", deviceInfo)
125             errorReplies.clear() // move to cleanUp()?
126             replies.clear()
127
128             throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
129         }
130     }
131
132     override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
133         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
134
135         checkAndReestablish()
136
137         return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
138             if (t != null) {
139                 throw NetconfException(messageId, t)
140             }
141             reply
142         }
143     }
144
145     override fun checkAndReestablish() {
146         try {
147             if (client.isClosed) {
148                 log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
149                 replies.clear()
150                 startConnection()
151             } else if (session.isClosed) {
152                 log.info("Trying to restart the session with {}", deviceInfo)
153                 replies.clear()
154                 startSession()
155             } else if (channel.isClosed) {
156                 log.info("Trying to reopen the channel with {}", deviceInfo)
157                 replies.clear()
158                 openChannel()
159             } else {
160                 return
161             }
162         } catch (e: IOException) {
163             log.error("Can't reopen connection for device {}", e.message)
164             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
165         } catch (e: IllegalStateException) {
166             log.error("Can't reopen connection for device {}", e.message)
167             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
168         }
169
170     }
171
172     override fun getDeviceInfo(): DeviceInfo {
173         return deviceInfo
174     }
175
176     override fun getSessionId(): String {
177         return this.sessionId!!
178     }
179
180     override fun getDeviceCapabilitiesSet(): Set<String> {
181         return Collections.unmodifiableSet(deviceCapabilities)
182     }
183
184     private fun startConnection() {
185         connectionTimeout = deviceInfo.connectTimeout
186         replyTimeout = deviceInfo.replyTimeout
187         idleTimeout = deviceInfo.idleTimeout
188         try {
189             startClient()
190         } catch (e: Exception) {
191             throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
192         }
193
194     }
195
196     private fun startClient() {
197         client = SshClient.setUpDefaultClient()
198         client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
199         client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
200         client.keyPairProvider = SimpleGeneratorHostKeyProvider()
201         client.start()
202
203         startSession()
204     }
205
206     private fun startSession() {
207         log.info("$deviceInfo: Starting SSH session")
208         val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
209             .verify(connectionTimeout, TimeUnit.SECONDS)
210         session = connectFuture.session
211         log.info("$deviceInfo: SSH session created")
212
213         authSession()
214     }
215
216     private fun authSession() {
217         session.addPasswordIdentity(deviceInfo.password)
218         session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
219         val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
220             ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
221         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
222             throw NetconfException("$deviceInfo: Failed to authenticate session.")
223         }
224         log.info("$deviceInfo: SSH session authenticated")
225
226         openChannel()
227     }
228
229     private fun openChannel() {
230         channel = session.createSubsystemChannel("netconf")
231         val channelFuture = channel.open()
232         if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
233             log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
234             setupHandler()
235         } else {
236             throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
237         }
238     }
239
240     private fun setupHandler() {
241         val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl()
242         streamHandler = NetconfDeviceCommunicator(channel.invertedOut, channel.invertedIn, deviceInfo,
243             sessionListener, replies)
244
245         exchangeHelloMessage()
246     }
247
248     private fun exchangeHelloMessage() {
249         sessionId = "-1"
250         val messageId = "-1"
251
252         val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
253         val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
254
255         if (sessionIDMatcher.find()) {
256             sessionId = sessionIDMatcher.group(1)
257         } else {
258             throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
259         }
260
261         val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
262         while (capabilityMatcher.find()) {
263             deviceCapabilities.plus(capabilityMatcher.group(1))
264         }
265     }
266
267     inner class NetconfSessionListenerImpl : NetconfSessionListener {
268         override fun notify(event: NetconfReceivedEvent) {
269             val messageId = event.getMessageID()
270
271             when (event.getType()) {
272                 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED -> disconnect()
273                 NetconfReceivedEvent.Type.DEVICE_ERROR -> errorReplies.add(event.getMessagePayload())
274                 NetconfReceivedEvent.Type.DEVICE_REPLY -> replies[messageId]?.complete(event.getMessagePayload())
275                 NetconfReceivedEvent.Type.SESSION_CLOSED -> disconnect()
276             }
277         }
278     }
279
280     fun sessionstatus(state:String): Boolean{
281         return when (state){
282             "Close" -> channel.isClosed
283             "Open" -> channel.isOpen
284             else -> false
285         }
286     }
287 }