2 * Copyright © 2017-2019 AT&T, Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.BufferedReader
26 import java.io.IOException
27 import java.io.InputStream
28 import java.io.InputStreamReader
29 import java.io.OutputStream
30 import java.io.OutputStreamWriter
31 import java.nio.charset.StandardCharsets
32 import java.util.concurrent.CompletableFuture
33 import java.util.concurrent.TimeUnit
35 class NetconfDeviceCommunicator(private var inputStream: InputStream,
36 private var out: OutputStream,
37 private val deviceInfo: DeviceInfo,
38 private val sessionListener: NetconfSessionListener,
39 private var replies: MutableMap<String, CompletableFuture<String>>) : Thread() {
41 private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
42 private var state = NetconfMessageState.NO_MATCHING_PATTERN
49 var bufferReader: BufferedReader? = null
50 while (bufferReader == null) {
51 bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
55 var socketClosed = false
56 val deviceReplyBuilder = StringBuilder()
57 while (!socketClosed) {
58 val cInt = bufferReader.read()
60 log.debug("$deviceInfo: Received end of stream, closing socket.")
64 state = state.evaluateChar(c)
65 deviceReplyBuilder.append(c)
66 if (state === NetconfMessageState.END_PATTERN) {
67 var deviceReply = deviceReplyBuilder.toString()
68 if (deviceReply == RpcMessageUtils.END_PATTERN) {
71 sessionListener.accept(NetconfReceivedEvent(
72 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
73 deviceInfo = deviceInfo))
75 deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
76 receivedMessage(deviceReply)
77 deviceReplyBuilder.setLength(0)
79 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
80 var deviceReply = deviceReplyBuilder.toString()
81 if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
82 log.debug("$deviceInfo: Received badly framed message $deviceReply")
84 sessionListener.accept(NetconfReceivedEvent(
85 NetconfReceivedEvent.Type.DEVICE_ERROR,
86 deviceInfo = deviceInfo))
88 deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
89 deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
90 receivedMessage(deviceReply)
91 deviceReplyBuilder.setLength(0)
96 } catch (e: IOException) {
97 log.warn("$deviceInfo: Fail while reading from channel", e)
98 sessionListener.accept(NetconfReceivedEvent(
99 NetconfReceivedEvent.Type.DEVICE_ERROR,
100 deviceInfo = deviceInfo))
106 * State machine for the Netconf message parser
108 internal enum class NetconfMessageState {
109 NO_MATCHING_PATTERN {
110 override fun evaluateChar(c: Char): NetconfMessageState {
119 override fun evaluateChar(c: Char): NetconfMessageState {
121 ']' -> SECOND_BRACKET
122 else -> NO_MATCHING_PATTERN
127 override fun evaluateChar(c: Char): NetconfMessageState {
130 else -> NO_MATCHING_PATTERN
135 override fun evaluateChar(c: Char): NetconfMessageState {
138 else -> NO_MATCHING_PATTERN
143 override fun evaluateChar(c: Char): NetconfMessageState {
146 else -> NO_MATCHING_PATTERN
151 override fun evaluateChar(c: Char): NetconfMessageState {
154 else -> NO_MATCHING_PATTERN
159 override fun evaluateChar(c: Char): NetconfMessageState {
164 else -> NO_MATCHING_PATTERN
169 override fun evaluateChar(c: Char): NetconfMessageState {
172 else -> NO_MATCHING_PATTERN
177 override fun evaluateChar(c: Char): NetconfMessageState {
179 '\n' -> END_CHUNKED_PATTERN
180 else -> NO_MATCHING_PATTERN
184 END_CHUNKED_PATTERN {
185 override fun evaluateChar(c: Char): NetconfMessageState {
186 return NO_MATCHING_PATTERN
190 override fun evaluateChar(c: Char): NetconfMessageState {
191 return NO_MATCHING_PATTERN
196 * Evaluate next transition state based on current state and the character read
197 * @param c character read in
198 * @return result of lookup of transition to the next {@link NetconfMessageState}
200 internal abstract fun evaluateChar(c: Char): NetconfMessageState
203 fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
204 log.info("$deviceInfo: Sending message: \n $request")
205 val future = CompletableFuture<String>()
206 replies.put(messageId, future)
207 val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
210 outputStream.write(request)
212 } catch (e: IOException) {
213 log.error("$deviceInfo: Failed to send message : \n $request", e)
214 future.completeExceptionally(e)
221 private fun receivedMessage(deviceReply: String) {
222 if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR)
223 || deviceReply.contains(RpcMessageUtils.HELLO)) {
224 log.info("$deviceInfo: Received message with messageId: {} \n $deviceReply",
225 NetconfMessageUtils.getMsgId(deviceReply))
228 log.error("$deviceInfo: Invalid message received: \n $deviceReply")
230 sessionListener.accept(NetconfReceivedEvent(
231 NetconfReceivedEvent.Type.DEVICE_REPLY,
233 NetconfMessageUtils.getMsgId(deviceReply),
238 * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage}
239 * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl.
240 * @param fut {@link CompletableFuture} object
241 * @param timeout the maximum time to wait
242 * @param timeUnit the time unit of the timeout argument
243 * @return the result value
244 * @throws CancellationException if this future was cancelled
245 * @throws ExecutionException if this future completed exceptionally
246 * @throws InterruptedException if the current thread was interrupted while waiting
247 * @throws TimeoutException if the wait timed outStream
249 internal fun getFutureFromSendMessage(
250 fut: CompletableFuture<String>, timeout: Long, timeUnit: TimeUnit): String {
251 return fut.get(timeout, timeUnit)