2 * Copyright © 2017-2019 AT&T, Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.BufferedReader
26 import java.io.IOException
27 import java.io.InputStream
28 import java.io.InputStreamReader
29 import java.io.OutputStream
30 import java.io.OutputStreamWriter
31 import java.nio.charset.StandardCharsets
32 import java.util.concurrent.CancellationException
33 import java.util.concurrent.CompletableFuture
34 import java.util.concurrent.ExecutionException
35 import java.util.concurrent.TimeUnit
36 import java.util.concurrent.TimeoutException
38 class NetconfDeviceCommunicator(
39 private var inputStream: InputStream,
40 private var out: OutputStream,
41 private val deviceInfo: DeviceInfo,
42 private val sessionListener: NetconfSessionListener,
43 private var replies: MutableMap<String, CompletableFuture<String>>
46 private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
47 private var state = NetconfMessageState.NO_MATCHING_PATTERN
54 var bufferReader: BufferedReader? = null
55 while (bufferReader == null) {
56 bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
60 var socketClosed = false
61 val deviceReplyBuilder = StringBuilder()
62 while (!socketClosed) {
63 val cInt = bufferReader.read()
65 log.debug("$deviceInfo: Received end of stream, closing socket.")
69 state = state.evaluateChar(c)
70 deviceReplyBuilder.append(c)
71 if (state === NetconfMessageState.END_PATTERN) {
72 var deviceReply = deviceReplyBuilder.toString()
73 if (deviceReply == RpcMessageUtils.END_PATTERN) {
76 sessionListener.accept(
78 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
79 deviceInfo = deviceInfo
83 deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
84 receivedMessage(deviceReply)
85 deviceReplyBuilder.setLength(0)
87 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
88 var deviceReply = deviceReplyBuilder.toString()
89 if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
90 log.debug("$deviceInfo: Received badly framed message $deviceReply")
92 sessionListener.accept(
94 NetconfReceivedEvent.Type.DEVICE_ERROR,
95 deviceInfo = deviceInfo
99 deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
100 deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
101 receivedMessage(deviceReply)
102 deviceReplyBuilder.setLength(0)
106 } catch (e: IOException) {
107 log.warn("$deviceInfo: Fail while reading from channel", e)
108 sessionListener.accept(
109 NetconfReceivedEvent(
110 NetconfReceivedEvent.Type.DEVICE_ERROR,
111 deviceInfo = deviceInfo
118 * State machine for the Netconf message parser
120 internal enum class NetconfMessageState {
122 NO_MATCHING_PATTERN {
123 override fun evaluateChar(c: Char): NetconfMessageState {
132 override fun evaluateChar(c: Char): NetconfMessageState {
134 ']' -> SECOND_BRACKET
135 else -> NO_MATCHING_PATTERN
140 override fun evaluateChar(c: Char): NetconfMessageState {
143 else -> NO_MATCHING_PATTERN
148 override fun evaluateChar(c: Char): NetconfMessageState {
151 else -> NO_MATCHING_PATTERN
156 override fun evaluateChar(c: Char): NetconfMessageState {
159 else -> NO_MATCHING_PATTERN
164 override fun evaluateChar(c: Char): NetconfMessageState {
167 else -> NO_MATCHING_PATTERN
172 override fun evaluateChar(c: Char): NetconfMessageState {
177 else -> NO_MATCHING_PATTERN
182 override fun evaluateChar(c: Char): NetconfMessageState {
185 else -> NO_MATCHING_PATTERN
190 override fun evaluateChar(c: Char): NetconfMessageState {
192 '\n' -> END_CHUNKED_PATTERN
193 else -> NO_MATCHING_PATTERN
197 END_CHUNKED_PATTERN {
198 override fun evaluateChar(c: Char): NetconfMessageState {
199 return NO_MATCHING_PATTERN
203 override fun evaluateChar(c: Char): NetconfMessageState {
204 return NO_MATCHING_PATTERN
209 * Evaluate next transition state based on current state and the character read
210 * @param c character read in
211 * @return result of lookup of transition to the next {@link NetconfMessageState}
213 internal abstract fun evaluateChar(c: Char): NetconfMessageState
216 fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
217 log.info("$deviceInfo: Sending message with message-id: $messageId: message: \n $request")
218 val future = CompletableFuture<String>()
219 replies.put(messageId, future)
220 val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
223 outputStream.write(request)
225 } catch (e: IOException) {
226 log.error("$deviceInfo: Failed to send message : \n $request", e)
227 future.completeExceptionally(e)
233 private fun receivedMessage(deviceReply: String) {
234 if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR) ||
235 deviceReply.contains(RpcMessageUtils.HELLO)
238 "$deviceInfo: Received message with messageId: {} \n $deviceReply",
239 NetconfMessageUtils.getMsgId(deviceReply)
242 log.error("$deviceInfo: Invalid message received: \n $deviceReply")
244 sessionListener.accept(
245 NetconfReceivedEvent(
246 NetconfReceivedEvent.Type.DEVICE_REPLY,
248 NetconfMessageUtils.getMsgId(deviceReply),
255 * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage}
256 * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl.
257 * @param fut {@link CompletableFuture} object
258 * @param timeout the maximum time to wait
259 * @param timeUnit the time unit of the timeout argument
260 * @return the result value
261 * @throws CancellationException if this future was cancelled
262 * @throws ExecutionException if this future completed exceptionally
263 * @throws InterruptedException if the current thread was interrupted while waiting
264 * @throws TimeoutException if the wait timed outStream
266 internal fun getFutureFromSendMessage(
267 fut: CompletableFuture<String>,
271 return fut.get(timeout, timeUnit)