2 * Copyright © 2017-2018 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.core
19 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.NetconfException
20 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.data.NetconfDeviceOutputEvent
21 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.DeviceInfo
22 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.NetconfSessionDelegate
23 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcConstants
24 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
25 import org.slf4j.LoggerFactory
27 import java.nio.charset.StandardCharsets
28 import java.util.concurrent.CompletableFuture
31 class NetconfStreamThread(private var inputStream: InputStream, private var out : OutputStream,
32 private val netconfDeviceInfo: DeviceInfo, private val netconfSessionDelegate: NetconfSessionDelegate,
33 private var replies :MutableMap<String, CompletableFuture<String>> ) : Thread() {
35 val log = LoggerFactory.getLogger(NetconfStreamThread::class.java)
36 lateinit var state : NetconfMessageState
37 // val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
38 private var outputStream: OutputStreamWriter? = null
41 var bufferReader: BufferedReader? = null
42 while (bufferReader == null) {
43 bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
47 var socketClosed = false
48 val deviceReplyBuilder = StringBuilder()
49 while (!socketClosed) {
50 val cInt = bufferReader!!.read()
52 log.debug("Netconf device {} sent error char in session will need to be reopend",
54 NetconfDeviceOutputEvent(NetconfDeviceOutputEvent.Type.SESSION_CLOSED, null!!, null!!,
55 null !!, netconfDeviceInfo)
57 log.debug("Netconf device {} ERROR cInt == -1 socketClosed = true", netconfDeviceInfo)
60 state = state.evaluateChar(c)
61 deviceReplyBuilder.append(c)
62 if (state === NetconfMessageState.END_PATTERN) {
63 var deviceReply = deviceReplyBuilder.toString()
64 if (deviceReply == RpcConstants.END_PATTERN) {
68 deviceReply = deviceReply.replace(RpcConstants.END_PATTERN, "")
69 dealWithReply(deviceReply)
70 deviceReplyBuilder.setLength(0)
72 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
73 var deviceReply = deviceReplyBuilder.toString()
74 if (!RpcMessageUtils.validateChunkedFraming(deviceReply)) {
75 log.debug("Netconf device {} send badly framed message {}", netconfDeviceInfo, deviceReply)
79 deviceReply = deviceReply.replace(RpcConstants.MSGLEN_REGEX_PATTERN.toRegex(), "")
80 deviceReply = deviceReply.replace(RpcMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
81 dealWithReply(deviceReply)
82 deviceReplyBuilder.setLength(0)
86 } catch (e: IOException) {
87 log.warn("Error in reading from the session for device {} ", netconfDeviceInfo, e)
88 throw IllegalStateException(
89 NetconfException(message = "Error in reading from the session for device {}$netconfDeviceInfo"))
94 enum class NetconfMessageState {
97 override fun evaluateChar(c: Char): NetconfMessageState {
98 return if (c == ']') {
100 } else if (c == '\n') {
108 override fun evaluateChar(c: Char): NetconfMessageState {
109 return if (c == ']') {
117 override fun evaluateChar(c: Char): NetconfMessageState {
118 return if (c == '>') {
126 override fun evaluateChar(c: Char): NetconfMessageState {
127 return if (c == ']') {
135 override fun evaluateChar(c: Char): NetconfMessageState {
136 return if (c == ']') {
144 override fun evaluateChar(c: Char): NetconfMessageState {
145 return if (c == '>') {
153 override fun evaluateChar(c: Char): NetconfMessageState {
154 return if (c == '#') {
156 } else if (c == ']') {
158 } else if (c == '\n') {
166 override fun evaluateChar(c: Char): NetconfMessageState {
167 return if (c == '#') {
175 override fun evaluateChar(c: Char): NetconfMessageState {
176 return if (c == '\n') {
183 END_CHUNKED_PATTERN {
184 override fun evaluateChar(c: Char): NetconfMessageState {
185 return NO_MATCHING_PATTERN
189 override fun evaluateChar(c: Char): NetconfMessageState {
190 return NO_MATCHING_PATTERN
194 internal abstract fun evaluateChar(c: Char): NetconfMessageState
197 private fun close(deviceReply: String) {
198 log.debug("Netconf device {} socketClosed = true DEVICE_UNREGISTERED {}", netconfDeviceInfo, deviceReply)
199 NetconfDeviceOutputEvent(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED, null!!, null!!, null!!,
204 private fun dealWithReply(deviceReply: String) {
205 if (deviceReply.contains(RpcConstants.RPC_REPLY) || deviceReply.contains(RpcConstants.RPC_ERROR)
206 || deviceReply.contains(RpcConstants.HELLO)) {
207 log.info("From Netconf Device: {} \n for Message-ID: {} \n Device-Reply: \n {} \n ", netconfDeviceInfo,
208 RpcMessageUtils.getMsgId(deviceReply), deviceReply)
209 val event = NetconfDeviceOutputEvent(NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
210 null!!, deviceReply, RpcMessageUtils.getMsgId(deviceReply), netconfDeviceInfo)
211 netconfSessionDelegate.notify(event)
213 log.debug("Error Reply: \n {} \n from Netconf Device: {}", deviceReply, netconfDeviceInfo)
217 @SuppressWarnings("squid:S3655")
219 fun sendMessage(request: String): CompletableFuture<String> {
220 val messageId = RpcMessageUtils.getMsgId(request)
221 return sendMessage(request, messageId.get())
224 fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
225 log.info("Sending message: \n {} \n to NETCONF Device: {}", request, netconfDeviceInfo)
226 val cf = CompletableFuture<String>()
227 replies.put(messageId, cf)
228 // outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
229 synchronized(OutputStreamWriter(out, StandardCharsets.UTF_8)) {
232 OutputStreamWriter(out, StandardCharsets.UTF_8).write(request)
233 OutputStreamWriter(out, StandardCharsets.UTF_8).flush()
234 } catch (e: IOException) {
235 log.error("Writing to NETCONF Device {} failed", netconfDeviceInfo, e)
236 cf.completeExceptionally(e)