2 * Copyright © 2017-2019 AT&T, Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ClientChannel
23 import org.apache.sshd.client.session.ClientSession
24 import org.apache.sshd.common.FactoryManager
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
26 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
27 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSession
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
33 import org.slf4j.LoggerFactory
34 import java.io.IOException
35 import java.util.Collections
36 import java.util.concurrent.CompletableFuture
37 import java.util.concurrent.ConcurrentHashMap
38 import java.util.concurrent.ExecutionException
39 import java.util.concurrent.TimeUnit
40 import java.util.concurrent.TimeoutException
42 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
45 private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
47 private val errorReplies: MutableList<String> = Collections.synchronizedList(mutableListOf())
48 private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
49 private val deviceCapabilities = mutableSetOf<String>()
51 private var connectionTimeout: Long = 0
52 private var replyTimeout: Int = 0
53 private var idleTimeout: Int = 0
54 private var sessionId: String? = null
56 private lateinit var session: ClientSession
57 private lateinit var client: SshClient
58 private lateinit var channel: ClientChannel
59 private lateinit var streamHandler: NetconfDeviceCommunicator
61 private var capabilities =
62 ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
64 override fun connect() {
67 "$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
68 "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}"
71 log.info("$deviceInfo: Connected to Netconf Device")
72 } catch (e: NetconfException) {
73 log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
74 throw NetconfException(e)
78 override fun disconnect() {
80 while (rpcService.closeSession(false).status
81 .equals(RpcStatus.FAILURE, true) && retryNum > 0
83 log.error("disconnect: graceful disconnect failed, retrying $retryNum times...")
86 // if we can't close the session, try to force terminate.
88 log.error("disconnect: trying to force-terminate the session.")
89 rpcService.closeSession(true)
93 } catch (ioe: IOException) {
94 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
98 override fun reconnect() {
103 override fun syncRpc(request: String, messageId: String): String {
104 val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
106 checkAndReestablish()
109 return streamHandler.getFutureFromSendMessage(
110 streamHandler.sendMessage(formattedRequest, messageId),
111 replyTimeout.toLong(), TimeUnit.SECONDS
113 } catch (e: InterruptedException) {
114 throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
115 } catch (e: TimeoutException) {
116 throw NetconfException(
117 "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
120 } catch (e: ExecutionException) {
121 log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
124 } catch (ioe: IOException) {
125 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
130 throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
134 override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
135 val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
137 checkAndReestablish()
139 return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
141 throw NetconfException(messageId, t)
147 override fun checkAndReestablish() {
151 log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
155 session.isClosed -> {
156 log.info("Trying to restart the session with {}", deviceInfo)
160 channel.isClosed -> {
161 log.info("Trying to reopen the channel with {}", deviceInfo)
167 } catch (e: IOException) {
168 log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
169 throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
170 } catch (e: IllegalStateException) {
171 log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
172 throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
176 override fun getDeviceInfo(): DeviceInfo {
180 override fun getSessionId(): String {
181 return this.sessionId!!
184 override fun getDeviceCapabilitiesSet(): Set<String> {
185 return Collections.unmodifiableSet(deviceCapabilities)
188 private fun startConnection() {
189 connectionTimeout = deviceInfo.connectTimeout
190 replyTimeout = deviceInfo.replyTimeout
191 idleTimeout = deviceInfo.idleTimeout
194 } catch (e: Exception) {
195 throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
199 // Needed to unit test connect method interacting with client.start in startClient() below
200 private fun setupNewSSHClient() {
201 client = SshClient.setUpDefaultClient()
204 private fun startClient() {
207 client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
208 client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
214 private fun startSession() {
215 log.info("$deviceInfo: Starting SSH session")
216 val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
217 .verify(connectionTimeout, TimeUnit.SECONDS)
218 session = connectFuture.session
219 log.info("$deviceInfo: SSH session created")
224 private fun authSession() {
225 session.addPasswordIdentity(deviceInfo.password)
226 session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
227 val event = session.waitFor(
229 ClientSession.ClientSessionEvent.WAIT_AUTH,
230 ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED
233 if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
234 throw NetconfException("$deviceInfo: Failed to authenticate session.")
236 log.info("$deviceInfo: SSH session authenticated")
241 private fun openChannel() {
242 channel = session.createSubsystemChannel("netconf")
243 val channelFuture = channel.open()
244 if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
245 log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
248 throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
252 private fun setupHandler() {
253 val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl(this)
254 streamHandler = NetconfDeviceCommunicator(
255 channel.invertedOut, channel.invertedIn, deviceInfo,
256 sessionListener, replies
259 exchangeHelloMessage()
262 private fun exchangeHelloMessage() {
266 val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
267 val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
269 if (sessionIDMatcher.find()) {
270 sessionId = sessionIDMatcher.group(1)
271 log.info("netconf exchangeHelloMessage sessionID: $sessionId")
273 throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
276 val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
277 while (capabilityMatcher.find()) { // TODO: refactor to add unit test easily for device capability accumulation.
278 deviceCapabilities.add(capabilityMatcher.group(1))
282 internal fun setStreamHandler(streamHandler: NetconfDeviceCommunicator) {
283 this.streamHandler = streamHandler
288 * Used by {@link NetconfSessionListenerImpl}
290 internal fun addDeviceErrorReply(errReply: String) {
291 errorReplies.add(errReply)
295 * Add a reply from the device
296 * Used by {@link NetconfSessionListenerImpl}
298 internal fun addDeviceReply(messageId: String, replyMsg: String) {
299 replies[messageId]?.complete(replyMsg)
303 * Closes the session/channel/client
305 @Throws(IOException::class)
306 private fun close() {
307 log.debug("close was called.")
309 // Closes the socket which should interrupt the streamHandler
315 * Internal function for accessing replies for testing.
317 internal fun getReplies() = replies
320 * internal function for accessing errorReplies for testing.
322 internal fun getErrorReplies() = errorReplies
324 internal fun clearErrorReplies() = errorReplies.clear()
325 internal fun clearReplies() = replies.clear()
326 internal fun setClient(client: SshClient) {
330 internal fun setSession(session: ClientSession) {
331 this.session = session
334 internal fun setChannel(channel: ClientChannel) {
335 this.channel = channel