Merge "Refractor rest log tracing filter for reuse."
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / netconf-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / netconf / executor / core / NetconfSessionImpl.kt
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ClientChannel
23 import org.apache.sshd.client.session.ClientSession
24 import org.apache.sshd.common.FactoryManager
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.*
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
29 import org.slf4j.LoggerFactory
30 import java.io.IOException
31 import java.util.*
32 import java.util.concurrent.*
33
34 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
35     NetconfSession {
36
37     private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
38
39     private val errorReplies: MutableList<String> = Collections.synchronizedList(mutableListOf())
40     private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
41     private val deviceCapabilities = mutableSetOf<String>()
42
43     private var connectionTimeout: Long = 0
44     private var replyTimeout: Int = 0
45     private var idleTimeout: Int = 0
46     private var sessionId: String? = null
47
48     private lateinit var session: ClientSession
49     private lateinit var client: SshClient
50     private lateinit var channel: ClientChannel
51     private lateinit var streamHandler: NetconfDeviceCommunicator
52
53     private var capabilities =
54         ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
55
56     override fun connect() {
57         try {
58             log.info("$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
59                     "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}")
60             startConnection()
61             log.info("$deviceInfo: Connected to Netconf Device")
62         } catch (e: NetconfException) {
63             log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
64             throw NetconfException(e)
65         }
66     }
67
68     override fun disconnect() {
69         var retryNum = 3
70         while(rpcService.closeSession(false).status
71                 .equals(RpcStatus.FAILURE, true) &&retryNum>0) {
72             log.error("disconnect: graceful disconnect failed, retrying $retryNum times...")
73             retryNum--;
74         }
75         //if we can't close the session, try to force terminate.
76         if(retryNum == 0) {
77             log.error("disconnect: trying to force-terminate the session.")
78             rpcService.closeSession(true)
79         }
80         try {
81             close()
82         } catch (ioe: IOException) {
83             log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
84         }
85     }
86
87     override fun reconnect() {
88         disconnect()
89         connect()
90     }
91
92     override fun syncRpc(request: String, messageId: String): String {
93         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
94
95         checkAndReestablish()
96
97         try {
98             return streamHandler.getFutureFromSendMessage(streamHandler.sendMessage(formattedRequest, messageId),
99                 replyTimeout.toLong(), TimeUnit.SECONDS)
100         } catch (e: InterruptedException) {
101             throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
102         } catch (e: TimeoutException) {
103             throw NetconfException("$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
104                 e)
105         } catch (e: ExecutionException) {
106             log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
107             try {
108                 close()
109             } catch (ioe: IOException) {
110                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
111             }
112             clearErrorReplies()
113             clearReplies()
114
115             throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
116         }
117     }
118
119     override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
120         val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
121
122         checkAndReestablish()
123
124         return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
125             if (t != null) {
126                 throw NetconfException(messageId, t)
127             }
128             reply
129         }
130     }
131
132     override fun checkAndReestablish() {
133         try {
134             when {
135                 client.isClosed -> {
136                     log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
137                     clearReplies()
138                     startConnection()
139                 }
140                 session.isClosed -> {
141                     log.info("Trying to restart the session with {}", deviceInfo)
142                     clearReplies()
143                     startSession()
144                 }
145                 channel.isClosed -> {
146                     log.info("Trying to reopen the channel with {}", deviceInfo)
147                     clearReplies()
148                     openChannel()
149                 }
150                 else -> return
151             }
152         } catch (e: IOException) {
153             log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
154             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
155         } catch (e: IllegalStateException) {
156             log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
157             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
158         }
159     }
160
161     override fun getDeviceInfo(): DeviceInfo {
162         return deviceInfo
163     }
164
165     override fun getSessionId(): String {
166         return this.sessionId!!
167     }
168
169     override fun getDeviceCapabilitiesSet(): Set<String> {
170         return Collections.unmodifiableSet(deviceCapabilities)
171     }
172
173     private fun startConnection() {
174         connectionTimeout = deviceInfo.connectTimeout
175         replyTimeout = deviceInfo.replyTimeout
176         idleTimeout = deviceInfo.idleTimeout
177         try {
178             startClient()
179         } catch (e: Exception) {
180             throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
181         }
182
183     }
184
185     //Needed to unit test connect method interacting with client.start in startClient() below
186     private fun setupNewSSHClient() {
187         client = SshClient.setUpDefaultClient()
188     }
189
190     private fun startClient() {
191         setupNewSSHClient()
192
193         client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
194         client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
195         client.start()
196
197         startSession()
198     }
199
200     private fun startSession() {
201         log.info("$deviceInfo: Starting SSH session")
202         val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
203             .verify(connectionTimeout, TimeUnit.SECONDS)
204         session = connectFuture.session
205         log.info("$deviceInfo: SSH session created")
206
207         authSession()
208     }
209
210     private fun authSession() {
211         session.addPasswordIdentity(deviceInfo.password)
212         session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
213         val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
214             ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
215         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
216             throw NetconfException("$deviceInfo: Failed to authenticate session.")
217         }
218         log.info("$deviceInfo: SSH session authenticated")
219
220         openChannel()
221     }
222
223     private fun openChannel() {
224         channel = session.createSubsystemChannel("netconf")
225         val channelFuture = channel.open()
226         if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
227             log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
228             setupHandler()
229         } else {
230             throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
231         }
232     }
233
234     private fun setupHandler() {
235         val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl(this)
236         streamHandler = NetconfDeviceCommunicator(channel.invertedOut, channel.invertedIn, deviceInfo,
237             sessionListener, replies)
238
239         exchangeHelloMessage()
240     }
241
242     private fun exchangeHelloMessage() {
243         sessionId = "-1"
244         val messageId = "-1"
245
246         val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
247         val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
248
249         if (sessionIDMatcher.find()) {
250             sessionId = sessionIDMatcher.group(1)
251             log.info("netconf exchangeHelloMessage sessionID: $sessionId")
252         } else {
253             throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
254         }
255
256         val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
257         while (capabilityMatcher.find()) { //TODO: refactor to add unit test easily for device capability accumulation.
258             deviceCapabilities.add(capabilityMatcher.group(1))
259         }
260     }
261
262     internal fun setStreamHandler(streamHandler: NetconfDeviceCommunicator) {
263         this.streamHandler = streamHandler
264     }
265
266     /**
267      * Add an error reply
268      * Used by {@link NetconfSessionListenerImpl}
269      */
270     internal fun addDeviceErrorReply(errReply: String) {
271         errorReplies.add(errReply)
272     }
273
274     /**
275      * Add a reply from the device
276      * Used by {@link NetconfSessionListenerImpl}
277      */
278     internal fun addDeviceReply(messageId: String, replyMsg: String) {
279         replies[messageId]?.complete(replyMsg)
280     }
281
282     /**
283      * Closes the session/channel/client
284      */
285     @Throws(IOException::class)
286     private fun close() {
287         log.debug("close was called.")
288         session.close()
289         // Closes the socket which should interrupt the streamHandler
290         channel.close()
291         client.close()
292     }
293
294     /**
295      * Internal function for accessing replies for testing.
296      */
297     internal fun getReplies() = replies
298
299     /**
300      * internal function for accessing errorReplies for testing.
301      */
302     internal fun getErrorReplies() = errorReplies
303     internal fun clearErrorReplies() = errorReplies.clear()
304     internal fun clearReplies() = replies.clear()
305     internal fun setClient(client: SshClient) { this.client = client }
306     internal fun setSession(session: ClientSession) { this.session = session }
307     internal fun setChannel(channel: ClientChannel) { this.channel = channel }
308 }