2 * Copyright © 2017-2019 AT&T, Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.BufferedReader
26 import java.io.IOException
27 import java.io.InputStream
28 import java.io.InputStreamReader
29 import java.io.OutputStream
30 import java.io.OutputStreamWriter
31 import java.nio.charset.StandardCharsets
32 import java.util.concurrent.CompletableFuture
34 class NetconfDeviceCommunicator(private var inputStream: InputStream,
35 private var out: OutputStream,
36 private val deviceInfo: DeviceInfo,
37 private val sessionListener: NetconfSessionListener,
38 private var replies: MutableMap<String, CompletableFuture<String>>) : Thread() {
40 private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
41 private var state = NetconfMessageState.NO_MATCHING_PATTERN
48 var bufferReader: BufferedReader? = null
49 while (bufferReader == null) {
50 bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
54 var socketClosed = false
55 val deviceReplyBuilder = StringBuilder()
56 while (!socketClosed) {
57 val cInt = bufferReader.read()
59 log.error("$deviceInfo: Received cInt = -1")
60 // bufferReader.close()
62 // sessionListener.notify(NetconfReceivedEvent(
63 // NetconfReceivedEvent.Type.SESSION_CLOSED,
64 // deviceInfo = deviceInfo))
67 state = state.evaluateChar(c)
68 deviceReplyBuilder.append(c)
69 if (state === NetconfMessageState.END_PATTERN) {
70 var deviceReply = deviceReplyBuilder.toString()
71 if (deviceReply == RpcMessageUtils.END_PATTERN) {
74 sessionListener.notify(NetconfReceivedEvent(
75 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
76 deviceInfo = deviceInfo))
78 deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
79 receivedMessage(deviceReply)
80 deviceReplyBuilder.setLength(0)
82 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
83 var deviceReply = deviceReplyBuilder.toString()
84 if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
85 log.debug("$deviceInfo: Received badly framed message $deviceReply")
87 sessionListener.notify(NetconfReceivedEvent(
88 NetconfReceivedEvent.Type.DEVICE_ERROR,
89 deviceInfo = deviceInfo))
91 deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
92 deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
93 receivedMessage(deviceReply)
94 deviceReplyBuilder.setLength(0)
99 } catch (e: IOException) {
100 log.warn("$deviceInfo: Fail while reading from channel", e)
101 sessionListener.notify(NetconfReceivedEvent(
102 NetconfReceivedEvent.Type.DEVICE_ERROR,
103 deviceInfo = deviceInfo))
108 private enum class NetconfMessageState {
109 NO_MATCHING_PATTERN {
110 override fun evaluateChar(c: Char): NetconfMessageState {
111 return if (c == ']') {
113 } else if (c == '\n') {
121 override fun evaluateChar(c: Char): NetconfMessageState {
122 return if (c == ']') {
130 override fun evaluateChar(c: Char): NetconfMessageState {
131 return if (c == '>') {
139 override fun evaluateChar(c: Char): NetconfMessageState {
140 return if (c == ']') {
148 override fun evaluateChar(c: Char): NetconfMessageState {
149 return if (c == ']') {
157 override fun evaluateChar(c: Char): NetconfMessageState {
158 return if (c == '>') {
166 override fun evaluateChar(c: Char): NetconfMessageState {
167 return if (c == '#') {
169 } else if (c == ']') {
171 } else if (c == '\n') {
179 override fun evaluateChar(c: Char): NetconfMessageState {
180 return if (c == '#') {
188 override fun evaluateChar(c: Char): NetconfMessageState {
189 return if (c == '\n') {
196 END_CHUNKED_PATTERN {
197 override fun evaluateChar(c: Char): NetconfMessageState {
198 return NO_MATCHING_PATTERN
202 override fun evaluateChar(c: Char): NetconfMessageState {
203 return NO_MATCHING_PATTERN
207 internal abstract fun evaluateChar(c: Char): NetconfMessageState
210 fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
211 log.info("$deviceInfo: Sending message: \n $request")
212 val future = CompletableFuture<String>()
213 replies.put(messageId, future)
214 val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
217 outputStream.write(request)
219 } catch (e: IOException) {
220 log.error("$deviceInfo: Failed to send message : \n $request", e)
221 future.completeExceptionally(e)
228 private fun receivedMessage(deviceReply: String) {
229 if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR)
230 || deviceReply.contains(RpcMessageUtils.HELLO)) {
231 log.info("$deviceInfo: Received message with messageId: {} \n $deviceReply",
232 NetconfMessageUtils.getMsgId(deviceReply))
235 log.error("$deviceInfo: Invalid message received: \n $deviceReply")
237 sessionListener.notify(NetconfReceivedEvent(
238 NetconfReceivedEvent.Type.DEVICE_REPLY,
240 NetconfMessageUtils.getMsgId(deviceReply),