2 * Copyright © 2017-2019 AT&T, Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.BufferedReader
26 import java.io.IOException
27 import java.io.InputStream
28 import java.io.InputStreamReader
29 import java.io.OutputStream
30 import java.io.OutputStreamWriter
31 import java.nio.charset.StandardCharsets
32 import java.util.concurrent.CompletableFuture
34 class NetconfDeviceCommunicator(private var inputStream: InputStream,
35 private var out: OutputStream,
36 private val deviceInfo: DeviceInfo,
37 private val sessionListener: NetconfSessionListener,
38 private var replies: MutableMap<String, CompletableFuture<String>>) : Thread() {
40 private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
41 private var state = NetconfMessageState.NO_MATCHING_PATTERN
48 var bufferReader: BufferedReader? = null
49 while (bufferReader == null) {
50 bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
54 var socketClosed = false
55 val deviceReplyBuilder = StringBuilder()
56 while (!socketClosed) {
57 val cInt = bufferReader.read()
59 log.error("$deviceInfo: Received cInt = -1")
63 state = state.evaluateChar(c)
64 deviceReplyBuilder.append(c)
65 if (state === NetconfMessageState.END_PATTERN) {
66 var deviceReply = deviceReplyBuilder.toString()
67 if (deviceReply == RpcMessageUtils.END_PATTERN) {
70 sessionListener.accept(NetconfReceivedEvent(
71 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
72 deviceInfo = deviceInfo))
74 deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
75 receivedMessage(deviceReply)
76 deviceReplyBuilder.setLength(0)
78 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
79 var deviceReply = deviceReplyBuilder.toString()
80 if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
81 log.debug("$deviceInfo: Received badly framed message $deviceReply")
83 sessionListener.accept(NetconfReceivedEvent(
84 NetconfReceivedEvent.Type.DEVICE_ERROR,
85 deviceInfo = deviceInfo))
87 deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
88 deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
89 receivedMessage(deviceReply)
90 deviceReplyBuilder.setLength(0)
95 } catch (e: IOException) {
96 log.warn("$deviceInfo: Fail while reading from channel", e)
97 sessionListener.accept(NetconfReceivedEvent(
98 NetconfReceivedEvent.Type.DEVICE_ERROR,
99 deviceInfo = deviceInfo))
105 * State machine for the Netconf message parser
107 internal enum class NetconfMessageState {
108 NO_MATCHING_PATTERN {
109 override fun evaluateChar(c: Char): NetconfMessageState {
118 override fun evaluateChar(c: Char): NetconfMessageState {
120 ']' -> SECOND_BRACKET
121 else -> NO_MATCHING_PATTERN
126 override fun evaluateChar(c: Char): NetconfMessageState {
129 else -> NO_MATCHING_PATTERN
134 override fun evaluateChar(c: Char): NetconfMessageState {
137 else -> NO_MATCHING_PATTERN
142 override fun evaluateChar(c: Char): NetconfMessageState {
145 else -> NO_MATCHING_PATTERN
150 override fun evaluateChar(c: Char): NetconfMessageState {
153 else -> NO_MATCHING_PATTERN
158 override fun evaluateChar(c: Char): NetconfMessageState {
163 else -> NO_MATCHING_PATTERN
168 override fun evaluateChar(c: Char): NetconfMessageState {
171 else -> NO_MATCHING_PATTERN
176 override fun evaluateChar(c: Char): NetconfMessageState {
178 '\n' -> END_CHUNKED_PATTERN
179 else -> NO_MATCHING_PATTERN
183 END_CHUNKED_PATTERN {
184 override fun evaluateChar(c: Char): NetconfMessageState {
185 return NO_MATCHING_PATTERN
189 override fun evaluateChar(c: Char): NetconfMessageState {
190 return NO_MATCHING_PATTERN
195 * Evaluate next transition state based on current state and the character read
196 * @param c character read in
197 * @return result of lookup of transition to the next {@link NetconfMessageState}
199 internal abstract fun evaluateChar(c: Char): NetconfMessageState
202 fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
203 log.info("$deviceInfo: Sending message: \n $request")
204 val future = CompletableFuture<String>()
205 replies.put(messageId, future)
206 val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
209 outputStream.write(request)
211 } catch (e: IOException) {
212 log.error("$deviceInfo: Failed to send message : \n $request", e)
213 future.completeExceptionally(e)
220 private fun receivedMessage(deviceReply: String) {
221 if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR)
222 || deviceReply.contains(RpcMessageUtils.HELLO)) {
223 log.info("$deviceInfo: Received message with messageId: {} \n $deviceReply",
224 NetconfMessageUtils.getMsgId(deviceReply))
227 log.error("$deviceInfo: Invalid message received: \n $deviceReply")
229 sessionListener.accept(NetconfReceivedEvent(
230 NetconfReceivedEvent.Type.DEVICE_REPLY,
232 NetconfMessageUtils.getMsgId(deviceReply),