2 * Copyright © 2017-2018 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.core
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.ClientBuilder
22 import org.apache.sshd.client.SshClient
23 import org.apache.sshd.client.channel.ClientChannel
24 import org.apache.sshd.client.session.ClientSession
25 import org.apache.sshd.client.simple.SimpleClient
26 import org.apache.sshd.common.FactoryManager
27 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
28 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.NetconfException
29 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.data.NetconfDeviceOutputEvent
30 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.DeviceInfo
31 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.NetconfSession
32 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.NetconfSessionDelegate
33 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcConstants
34 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
35 import org.slf4j.LoggerFactory
36 import java.io.IOException
38 import java.util.concurrent.*
39 import java.util.concurrent.atomic.AtomicInteger
42 class NetconfSessionImpl(private val deviceInfo: DeviceInfo ): NetconfSession {
43 val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
44 var connectTimeout: Long = 0
45 var replyTimeout: Int = 0
46 var idleTimeout: Int = 0
47 var sessionID: String? = null
48 var errorReplies: MutableList<String> = mutableListOf()
49 var netconfCapabilities = ImmutableList.of("urn:ietf:params:netconf:base:1.0", "urn:ietf:params:netconf:base:1.1")
51 // var replies: MutableMap<String, CompletableFuture<String>> = mutableListOf<String,CompletableFuture<String>()>()
52 var replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
53 val deviceCapabilities = LinkedHashSet<String>()
55 lateinit var session: ClientSession
56 lateinit var client: SshClient
57 lateinit var channel: ClientChannel
58 var streamHandler: NetconfStreamThread? = null
60 val messageIdInteger = AtomicInteger(1)
61 private var onosCapabilities = ImmutableList.of<String>(RpcConstants.NETCONF_10_CAPABILITY, RpcConstants.NETCONF_11_CAPABILITY)
68 private fun startConnection() {
69 connectTimeout = deviceInfo.connectTimeoutSec
70 replyTimeout = deviceInfo.replyTimeout
71 idleTimeout = deviceInfo.idleTimeout
72 log.info("Connecting to NETCONF Device {} with timeouts C:{}, R:{}, I:{}", deviceInfo, connectTimeout,
73 replyTimeout, idleTimeout)
76 } catch (e: IOException) {
77 throw NetconfException("Failed to establish SSH with device ${deviceInfo.deviceId}",e)
78 } catch (e:Exception){
79 throw NetconfException("Failed to establish SSH with device $deviceInfo",e)
84 private fun startClient() {
85 log.info("in the startClient")
86 // client = SshClient.setUpDefaultClient().toInt()
87 client = SshClient.setUpDefaultClient()
89 client = ClientBuilder.builder().build() as SshClient
90 log.info("client {}>>",client)
91 client.getProperties().putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
92 client.getProperties().putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT,
93 TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
95 client.setKeyPairProvider(SimpleGeneratorHostKeyProvider())
96 log.info("client {}>>",client.isOpen)
100 private fun startSession() {
101 log.info("in the startSession")
102 val connectFuture = client.connect(deviceInfo.name, deviceInfo.ipAddress, deviceInfo.port)
103 .verify(connectTimeout, TimeUnit.SECONDS)
104 log.info("connectFuture {}>>"+connectFuture)
105 session = connectFuture.session
107 session.addPasswordIdentity(deviceInfo.pass)
108 session.auth().verify(connectTimeout, TimeUnit.SECONDS)
110 val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
111 ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
113 if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
114 log.debug("Session closed {} for event {}", session.isClosed(), event)
115 throw NetconfException(String
116 .format("Failed to authenticate session with device (%s) check the user/pwd or key", deviceInfo))
121 private fun openChannel() {
122 log.info("in the open Channel")
123 channel = session.createSubsystemChannel("netconf")
124 val channeuture = channel.open()
126 if (channeuture!!.await(connectTimeout, TimeUnit.SECONDS) && channeuture.isOpened) {
127 val netconfSessionDelegate:NetconfSessionDelegate = NetconfSessionDelegateImpl()
128 streamHandler = NetconfStreamThread(channel.getInvertedOut(), channel.getInvertedIn(), deviceInfo,
129 netconfSessionDelegate, replies)
132 throw NetconfException(String.format("Failed to open channel with device (%s) $deviceInfo", deviceInfo))
136 private fun sendHello() {
137 sessionID = (-1).toString()
139 val serverHelloResponse = syncRpc(RpcMessageUtils.createHelloString(onosCapabilities), (-1).toString())
140 val sessionIDMatcher = RpcMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
142 if (sessionIDMatcher.find()) {
143 sessionID = sessionIDMatcher.group(1)
145 throw NetconfException("Missing SessionID in server hello reponse.")
148 val capabilityMatcher = RpcMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
149 while (capabilityMatcher.find()) {
150 deviceCapabilities.add(capabilityMatcher.group(1))
155 override fun asyncRpc( request: String, msgId: String): CompletableFuture<String> {
156 //return close(false);
157 var request = RpcMessageUtils.formatRPCRequest(request, msgId, deviceCapabilities)
159 * Checking Liveliness of the Session
161 checkAndReestablish()
163 return streamHandler!!.sendMessage(request, msgId).handleAsync { reply, t ->
165 //throw NetconfTransportException(t)
166 throw NetconfException(msgId)
172 override fun close(): Boolean {
175 @Throws(NetconfException::class)
176 private fun close(force: Boolean): Boolean {
177 val rpc = StringBuilder()
178 rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">")
180 rpc.append("<kill-session/>")
182 rpc.append("<close-session/>")
185 rpc.append(RpcConstants.END_PATTERN)
186 return RpcMessageUtils.checkReply(sendRequest(rpc.toString())) || close(true)
191 override fun getSessionId(): String? {
192 return this.sessionID
195 override fun getDeviceCapabilitiesSet(): Set<String> {
196 return Collections.unmodifiableSet(deviceCapabilities);
199 fun setCapabilities(capabilities: ImmutableList<String>) {
200 onosCapabilities = capabilities
203 override fun checkAndReestablish() {
205 if (client.isClosed) {
206 log.debug("Trying to restart the whole SSH connection with {}", deviceInfo.deviceId)
209 } else if (session.isClosed) {
210 log.debug("Trying to restart the session with {}", deviceInfo.deviceId)
213 } else if (channel.isClosed) {
214 log.debug("Trying to reopen the channel with {}", deviceInfo.deviceId)
220 } catch (e: IOException) {
221 log.error("Can't reopen connection for device {}", e.message)
222 throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
223 } catch (e: IllegalStateException) {
224 log.error("Can't reopen connection for device {}", e.message)
225 throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
230 override fun setCapabilities(capabilities: List<String>) {
231 super.setCapabilities(capabilities)
234 override fun getDeviceInfo(): DeviceInfo {
238 @Throws(NetconfException::class)
239 private fun sendRequest(request: String): String {
240 return syncRpc(request, messageIdInteger.getAndIncrement().toString())
243 @Throws(NetconfException::class)
244 override fun syncRpc(request: String, messageId: String): String {
245 var request = request
246 request = RpcMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
249 * Checking Liveliness of the Session
251 checkAndReestablish()
255 response = streamHandler!!.sendMessage(request, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
256 replies.remove(messageId) // Why here???
257 } catch (e: InterruptedException) {
258 Thread.currentThread().interrupt()
259 throw NetconfException("Interrupted waiting for reply for request$request",e)
260 } catch (e: TimeoutException) {
261 throw NetconfException(
262 "Timed out waiting for reply for request $request after $replyTimeout sec.",e)
263 } catch (e: ExecutionException) {
264 log.warn("Closing session {} for {} due to unexpected Error", sessionID, deviceInfo, e)
267 channel.close() // Closes the socket which should interrupt NetconfStreamThread
269 } catch (ioe: IOException) {
270 log.warn("Error closing session {} on {}", sessionID, deviceInfo, ioe)
273 NetconfDeviceOutputEvent(NetconfDeviceOutputEvent.Type.SESSION_CLOSED, null!!,
274 "Closed due to unexpected error " + e.cause, Optional.of("-1"), deviceInfo)
275 errorReplies.clear() // move to cleanUp()?
278 throw NetconfException(
279 "Closing session $sessionID for $deviceInfo for request $request",e)
282 log.debug("Response from NETCONF Device: \n {} \n", response)
283 return response.trim { it <= ' ' }
286 inner class NetconfSessionDelegateImpl : NetconfSessionDelegate {
287 override fun notify(event: NetconfDeviceOutputEvent) {
288 val messageId = event.getMessageID()
289 log.debug("messageID {}, waiting replies messageIDs {}", messageId, replies.keys)
290 if (messageId.isNullOrBlank()) {
291 errorReplies.add(event.getMessagePayload().toString())
292 log.error("Device {} sent error reply {}", event.getDeviceInfo(), event.getMessagePayload())
295 val completedReply = replies[messageId] // remove(..)?
296 completedReply?.complete(event.getMessagePayload())