34c01813a9c1762a0fb7533b83feaf03e583591c
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2018 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.core
18
19 import com.google.common.collect.ImmutableList
20 import com.google.common.collect.ImmutableSet
21 import org.apache.sshd.client.ClientBuilder
22 import org.apache.sshd.client.SshClient
23 import org.apache.sshd.client.channel.ClientChannel
24 import org.apache.sshd.client.session.ClientSession
25 import org.apache.sshd.client.simple.SimpleClient
26 import org.apache.sshd.common.FactoryManager
27 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
28 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.NetconfException
29 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.data.NetconfDeviceOutputEvent
30 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.DeviceInfo
31 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.NetconfSession
32 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.NetconfSessionDelegate
33 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcConstants
34 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
35 import org.slf4j.LoggerFactory
36 import java.io.IOException
37 import java.util.*
38 import java.util.concurrent.*
39 import java.util.concurrent.atomic.AtomicInteger
40
41
42 class NetconfSessionImpl(private val deviceInfo: DeviceInfo ): NetconfSession  {
43     val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
44     var connectTimeout: Long = 0
45     var replyTimeout: Int = 0
46     var idleTimeout: Int = 0
47     var sessionID: String? = null
48     var errorReplies: MutableList<String> = mutableListOf()
49     var netconfCapabilities = ImmutableList.of("urn:ietf:params:netconf:base:1.0", "urn:ietf:params:netconf:base:1.1")
50
51    // var replies: MutableMap<String, CompletableFuture<String>> = mutableListOf<String,CompletableFuture<String>()>()
52     var replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
53     val deviceCapabilities = LinkedHashSet<String>()
54
55     lateinit var session: ClientSession
56     lateinit var client: SshClient
57     lateinit var channel: ClientChannel
58     var streamHandler: NetconfStreamThread? = null
59
60     val messageIdInteger = AtomicInteger(1)
61     private var onosCapabilities = ImmutableList.of<String>(RpcConstants.NETCONF_10_CAPABILITY, RpcConstants.NETCONF_11_CAPABILITY)
62
63
64     init {
65           startConnection()
66     }
67
68     private fun startConnection() {
69         connectTimeout = deviceInfo.connectTimeoutSec
70         replyTimeout = deviceInfo.replyTimeout
71         idleTimeout = deviceInfo.idleTimeout
72         log.info("Connecting to NETCONF Device {} with timeouts C:{}, R:{}, I:{}", deviceInfo, connectTimeout,
73                 replyTimeout, idleTimeout)
74         try {
75             startClient()
76         } catch (e: IOException) {
77             throw NetconfException("Failed to establish SSH with device ${deviceInfo.deviceId}",e)
78         } catch (e:Exception){
79             throw NetconfException("Failed to establish SSH with device $deviceInfo",e)
80         }
81
82     }
83
84     private fun startClient() {
85         log.info("in the startClient")
86         // client = SshClient.setUpDefaultClient().toInt()
87         client = SshClient.setUpDefaultClient()
88
89         client = ClientBuilder.builder().build() as SshClient
90         log.info("client {}>>",client)
91         client.getProperties().putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
92         client.getProperties().putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT,
93                 TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
94         client.start()
95         client.setKeyPairProvider(SimpleGeneratorHostKeyProvider())
96         log.info("client {}>>",client.isOpen)
97         startSession()
98     }
99
100     private fun startSession() {
101         log.info("in the startSession")
102         val connectFuture = client.connect(deviceInfo.name, deviceInfo.ipAddress, deviceInfo.port)
103                 .verify(connectTimeout, TimeUnit.SECONDS)
104         log.info("connectFuture {}>>"+connectFuture)
105         session = connectFuture.session
106
107         session.addPasswordIdentity(deviceInfo.pass)
108         session.auth().verify(connectTimeout, TimeUnit.SECONDS)
109
110         val event = session.waitFor(ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
111                 ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED), 0)
112
113         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
114             log.debug("Session closed {} for event {}", session.isClosed(), event)
115             throw NetconfException(String
116                     .format("Failed to authenticate session with device (%s) check the user/pwd or key", deviceInfo))
117         }
118         openChannel()
119     }
120
121     private fun openChannel() {
122         log.info("in the open Channel")
123         channel = session.createSubsystemChannel("netconf")
124         val channeuture = channel.open()
125
126         if (channeuture!!.await(connectTimeout, TimeUnit.SECONDS) && channeuture.isOpened) {
127            val netconfSessionDelegate:NetconfSessionDelegate = NetconfSessionDelegateImpl()
128             streamHandler = NetconfStreamThread(channel.getInvertedOut(), channel.getInvertedIn(), deviceInfo,
129                     netconfSessionDelegate, replies)
130             sendHello()
131         } else {
132             throw NetconfException(String.format("Failed to open channel with device (%s) $deviceInfo", deviceInfo))
133         }
134     }
135
136     private fun sendHello() {
137         sessionID = (-1).toString()
138
139         val serverHelloResponse = syncRpc(RpcMessageUtils.createHelloString(onosCapabilities), (-1).toString())
140         val sessionIDMatcher = RpcMessageUtils.SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponse)
141
142         if (sessionIDMatcher.find()) {
143             sessionID = sessionIDMatcher.group(1)
144         } else {
145             throw NetconfException("Missing SessionID in server hello reponse.")
146         }
147
148         val capabilityMatcher = RpcMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
149         while (capabilityMatcher.find()) {
150             deviceCapabilities.add(capabilityMatcher.group(1))
151         }
152     }
153
154
155     override fun asyncRpc( request: String, msgId: String): CompletableFuture<String> {
156         //return close(false);
157        var  request = RpcMessageUtils.formatRPCRequest(request, msgId, deviceCapabilities)
158         /**
159          * Checking Liveliness of the Session
160          */
161         checkAndReestablish()
162
163         return streamHandler!!.sendMessage(request, msgId).handleAsync { reply, t ->
164             if (t != null) {
165                 //throw NetconfTransportException(t)
166                 throw NetconfException(msgId)
167             }
168             reply
169         }
170     }
171
172     override fun close(): Boolean {
173         return close(false);
174     }
175     @Throws(NetconfException::class)
176     private fun close(force: Boolean): Boolean {
177         val rpc = StringBuilder()
178         rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">")
179         if (force) {
180             rpc.append("<kill-session/>")
181         } else {
182             rpc.append("<close-session/>")
183         }
184         rpc.append("</rpc>")
185         rpc.append(RpcConstants.END_PATTERN)
186         return RpcMessageUtils.checkReply(sendRequest(rpc.toString())) || close(true)
187     }
188
189
190
191     override fun getSessionId(): String? {
192           return this.sessionID
193     }
194
195     override fun getDeviceCapabilitiesSet(): Set<String> {
196         return Collections.unmodifiableSet(deviceCapabilities);
197     }
198
199     fun setCapabilities(capabilities: ImmutableList<String>) {
200         onosCapabilities = capabilities
201     }
202
203     override fun checkAndReestablish() {
204         try {
205             if (client.isClosed) {
206                 log.debug("Trying to restart the whole SSH connection with {}", deviceInfo.deviceId)
207                 replies.clear()
208                 startConnection()
209             } else if (session.isClosed) {
210                 log.debug("Trying to restart the session with {}", deviceInfo.deviceId)
211                 replies.clear()
212                 startSession()
213             } else if (channel.isClosed) {
214                 log.debug("Trying to reopen the channel with {}", deviceInfo.deviceId)
215                 replies.clear()
216                 openChannel()
217             } else {
218                 return
219             }
220         } catch (e: IOException) {
221             log.error("Can't reopen connection for device {}", e.message)
222             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
223         } catch (e: IllegalStateException) {
224             log.error("Can't reopen connection for device {}", e.message)
225             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
226         }
227
228     }
229
230     override fun setCapabilities(capabilities: List<String>) {
231         super.setCapabilities(capabilities)
232     }
233
234     override fun getDeviceInfo(): DeviceInfo {
235         return deviceInfo
236     }
237
238     @Throws(NetconfException::class)
239     private fun sendRequest(request: String): String {
240         return syncRpc(request, messageIdInteger.getAndIncrement().toString())
241     }
242
243     @Throws(NetconfException::class)
244     override fun syncRpc(request: String, messageId: String): String {
245         var request = request
246         request = RpcMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
247
248         /**
249          * Checking Liveliness of the Session
250          */
251         checkAndReestablish()
252
253         val response: String
254         try {
255             response = streamHandler!!.sendMessage(request, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
256             replies.remove(messageId) // Why here???
257         } catch (e: InterruptedException) {
258             Thread.currentThread().interrupt()
259             throw NetconfException("Interrupted waiting for reply for request$request",e)
260         } catch (e: TimeoutException) {
261             throw NetconfException(
262                     "Timed out waiting for reply for request $request after $replyTimeout sec.",e)
263         } catch (e: ExecutionException) {
264             log.warn("Closing session {} for {} due to unexpected Error", sessionID, deviceInfo, e)
265             try {
266                 session.close()
267                 channel.close() // Closes the socket which should interrupt NetconfStreamThread
268                 client.close()
269             } catch (ioe: IOException) {
270                 log.warn("Error closing session {} on {}", sessionID, deviceInfo, ioe)
271             }
272
273             NetconfDeviceOutputEvent(NetconfDeviceOutputEvent.Type.SESSION_CLOSED, null!!,
274                     "Closed due to unexpected error " + e.cause, Optional.of("-1"), deviceInfo)
275             errorReplies.clear() // move to cleanUp()?
276             replies.clear()
277
278             throw NetconfException(
279                     "Closing session $sessionID for $deviceInfo for request $request",e)
280         }
281
282         log.debug("Response from NETCONF Device: \n {} \n", response)
283         return response.trim { it <= ' ' }
284     }
285
286     inner class NetconfSessionDelegateImpl : NetconfSessionDelegate {
287         override fun notify(event: NetconfDeviceOutputEvent) {
288             val messageId = event.getMessageID()
289             log.debug("messageID {}, waiting replies messageIDs {}", messageId, replies.keys)
290             if (messageId.isNullOrBlank()) {
291                 errorReplies.add(event.getMessagePayload().toString())
292                 log.error("Device {} sent error reply {}", event.getDeviceInfo(), event.getMessagePayload())
293                 return
294             }
295             val completedReply = replies[messageId] // remove(..)?
296             completedReply?.complete(event.getMessagePayload())
297         }
298     }
299     }