2 * Copyright © 2017-2019 AT&T, Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.core
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.SshClient
22 import org.apache.sshd.client.channel.ChannelSubsystem
23 import org.apache.sshd.client.channel.ClientChannel
24 import org.apache.sshd.client.session.ClientSession
25 import org.apache.sshd.client.session.ClientSessionImpl
26 import org.apache.sshd.common.FactoryManager
27 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
28 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
29 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfException
30 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
31 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
32 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfSession
33 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
34 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
35 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
36 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
37 import org.slf4j.LoggerFactory
38 import java.io.IOException
40 import java.util.concurrent.CompletableFuture
41 import java.util.concurrent.ConcurrentHashMap
42 import java.util.concurrent.ExecutionException
43 import java.util.concurrent.TimeUnit
44 import java.util.concurrent.TimeoutException
45 import java.util.concurrent.atomic.AtomicReference
47 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
50 private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
52 private val errorReplies: MutableList<String> = Collections.synchronizedList(listOf())
53 private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
54 private val deviceCapabilities = setOf<String>()
56 private var connectionTimeout: Long = 0
57 private var replyTimeout: Int = 0
58 private var idleTimeout: Int = 0
59 private var sessionId: String? = null
61 private lateinit var session: ClientSession
62 private lateinit var client: SshClient
63 private lateinit var channel: ClientChannel
64 private lateinit var streamHandler: NetconfDeviceCommunicator
66 private var capabilities =
67 ImmutableList.of(RpcMessageUtils.NETCONF_10_CAPABILITY, RpcMessageUtils.NETCONF_11_CAPABILITY)
69 override fun connect() {
71 log.info("$deviceInfo: Connecting to Netconf Device with timeouts C:${deviceInfo.connectTimeout}, " +
72 "R:${deviceInfo.replyTimeout}, I:${deviceInfo.idleTimeout}")
74 log.info("$deviceInfo: Connected to Netconf Device")
75 } catch (e: NetconfException) {
76 log.error("$deviceInfo: Netconf Device Connection Failed. ${e.message}")
77 throw NetconfException(e)
81 override fun disconnect() {
82 if (rpcService.closeSession(false).status.equals(
83 RpcStatus.FAILURE, true)) {
84 rpcService.closeSession(true)
88 // Closes the socket which should interrupt the streamHandler
93 override fun reconnect() {
98 override fun syncRpc(request: String, messageId: String): String {
99 val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
101 checkAndReestablish()
104 return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
105 // replies.remove(messageId)
106 } catch (e: InterruptedException) {
107 Thread.currentThread().interrupt()
108 throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
109 } catch (e: TimeoutException) {
110 throw NetconfException("$deviceInfo: Timed out while waiting for reply for request $formattedRequest after $replyTimeout sec.",
112 } catch (e: ExecutionException) {
113 log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
116 // Closes the socket which should interrupt the streamHandler
119 } catch (ioe: IOException) {
120 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
123 // NetconfReceivedEvent(NetconfReceivedEvent.Type.SESSION_CLOSED, "",
124 // "Closed due to unexpected error " + e.cause, "-1", deviceInfo)
125 errorReplies.clear() // move to cleanUp()?
128 throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
132 override fun asyncRpc(request: String, messageId: String): CompletableFuture<String> {
133 val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
135 checkAndReestablish()
137 return streamHandler.sendMessage(formattedRequest, messageId).handleAsync { reply, t ->
139 throw NetconfException(messageId, t)
145 override fun checkAndReestablish() {
147 if (client.isClosed) {
148 log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
151 } else if (session.isClosed) {
152 log.info("Trying to restart the session with {}", deviceInfo)
155 } else if (channel.isClosed) {
156 log.info("Trying to reopen the channel with {}", deviceInfo)
162 } catch (e: IOException) {
163 log.error("Can't reopen connection for device {}", e.message)
164 throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
165 } catch (e: IllegalStateException) {
166 log.error("Can't reopen connection for device {}", e.message)
167 throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
172 override fun getDeviceInfo(): DeviceInfo {
176 override fun getSessionId(): String {
177 return this.sessionId!!
180 override fun getDeviceCapabilitiesSet(): Set<String> {
181 return Collections.unmodifiableSet(deviceCapabilities)
184 private fun startConnection() {
185 connectionTimeout = deviceInfo.connectTimeout
186 replyTimeout = deviceInfo.replyTimeout
187 idleTimeout = deviceInfo.idleTimeout
190 } catch (e: Exception) {
191 throw NetconfException("$deviceInfo: Failed to establish SSH session", e)
196 private fun startClient() {
197 client = SshClient.setUpDefaultClient()
198 client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
199 client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
200 client.keyPairProvider = SimpleGeneratorHostKeyProvider()
206 private fun startSession() {
207 log.info("$deviceInfo: Starting SSH session")
208 val connectFuture = client.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port)
209 .verify(connectionTimeout, TimeUnit.SECONDS)
210 session = connectFuture.session
211 log.info("$deviceInfo: SSH session created")
216 private fun authSession() {
217 session.addPasswordIdentity(deviceInfo.password)
218 session.auth().verify(connectionTimeout, TimeUnit.SECONDS)
219 val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
220 ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
221 if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
222 throw NetconfException("$deviceInfo: Failed to authenticate session.")
224 log.info("$deviceInfo: SSH session authenticated")
229 private fun openChannel() {
230 channel = session.createSubsystemChannel("netconf")
231 val channelFuture = channel.open()
232 if (channelFuture.await(connectionTimeout, TimeUnit.SECONDS) && channelFuture.isOpened) {
233 log.info("$deviceInfo: SSH NETCONF subsystem channel opened")
236 throw NetconfException("$deviceInfo: Failed to open SSH subsystem channel")
240 private fun setupHandler() {
241 val sessionListener: NetconfSessionListener = NetconfSessionListenerImpl()
242 streamHandler = NetconfDeviceCommunicator(channel.invertedOut, channel.invertedIn, deviceInfo,
243 sessionListener, replies)
245 exchangeHelloMessage()
248 private fun exchangeHelloMessage() {
252 val serverHelloResponse = syncRpc(NetconfMessageUtils.createHelloString(capabilities), messageId)
253 val sessionIDMatcher = NetconfMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
255 if (sessionIDMatcher.find()) {
256 sessionId = sessionIDMatcher.group(1)
258 throw NetconfException("$deviceInfo: Missing sessionId in server hello message: $serverHelloResponse")
261 val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
262 while (capabilityMatcher.find()) {
263 deviceCapabilities.plus(capabilityMatcher.group(1))
267 inner class NetconfSessionListenerImpl : NetconfSessionListener {
268 override fun notify(event: NetconfReceivedEvent) {
269 val messageId = event.getMessageID()
271 when (event.getType()) {
272 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED -> disconnect()
273 NetconfReceivedEvent.Type.DEVICE_ERROR -> errorReplies.add(event.getMessagePayload())
274 NetconfReceivedEvent.Type.DEVICE_REPLY -> replies[messageId]?.complete(event.getMessagePayload())
275 NetconfReceivedEvent.Type.SESSION_CLOSED -> disconnect()
280 fun sessionstatus(state:String): Boolean{
282 "Close" -> channel.isClosed
283 "Open" -> channel.isOpen