2 * Copyright © 2017-2019 AT&T, Bell Canada
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
26 import java.nio.charset.*
27 import java.util.concurrent.*
29 class NetconfDeviceCommunicator(private var inputStream: InputStream,
30 private var out: OutputStream,
31 private val deviceInfo: DeviceInfo,
32 private val sessionListener: NetconfSessionListener,
33 private var replies: MutableMap<String, CompletableFuture<String>>) : Thread() {
35 private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
36 private var state = NetconfMessageState.NO_MATCHING_PATTERN
43 var bufferReader: BufferedReader? = null
44 while (bufferReader == null) {
45 bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
49 var socketClosed = false
50 val deviceReplyBuilder = StringBuilder()
51 while (!socketClosed) {
52 val cInt = bufferReader.read()
54 log.debug("$deviceInfo: Received end of stream, closing socket.")
58 state = state.evaluateChar(c)
59 deviceReplyBuilder.append(c)
60 if (state === NetconfMessageState.END_PATTERN) {
61 var deviceReply = deviceReplyBuilder.toString()
62 if (deviceReply == RpcMessageUtils.END_PATTERN) {
65 sessionListener.accept(NetconfReceivedEvent(
66 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
67 deviceInfo = deviceInfo))
69 deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
70 receivedMessage(deviceReply)
71 deviceReplyBuilder.setLength(0)
73 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
74 var deviceReply = deviceReplyBuilder.toString()
75 if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
76 log.debug("$deviceInfo: Received badly framed message $deviceReply")
78 sessionListener.accept(NetconfReceivedEvent(
79 NetconfReceivedEvent.Type.DEVICE_ERROR,
80 deviceInfo = deviceInfo))
82 deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
83 deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
84 receivedMessage(deviceReply)
85 deviceReplyBuilder.setLength(0)
90 } catch (e: IOException) {
91 log.warn("$deviceInfo: Fail while reading from channel", e)
92 sessionListener.accept(NetconfReceivedEvent(
93 NetconfReceivedEvent.Type.DEVICE_ERROR,
94 deviceInfo = deviceInfo))
100 * State machine for the Netconf message parser
102 internal enum class NetconfMessageState {
103 NO_MATCHING_PATTERN {
104 override fun evaluateChar(c: Char): NetconfMessageState {
113 override fun evaluateChar(c: Char): NetconfMessageState {
115 ']' -> SECOND_BRACKET
116 else -> NO_MATCHING_PATTERN
121 override fun evaluateChar(c: Char): NetconfMessageState {
124 else -> NO_MATCHING_PATTERN
129 override fun evaluateChar(c: Char): NetconfMessageState {
132 else -> NO_MATCHING_PATTERN
137 override fun evaluateChar(c: Char): NetconfMessageState {
140 else -> NO_MATCHING_PATTERN
145 override fun evaluateChar(c: Char): NetconfMessageState {
148 else -> NO_MATCHING_PATTERN
153 override fun evaluateChar(c: Char): NetconfMessageState {
158 else -> NO_MATCHING_PATTERN
163 override fun evaluateChar(c: Char): NetconfMessageState {
166 else -> NO_MATCHING_PATTERN
171 override fun evaluateChar(c: Char): NetconfMessageState {
173 '\n' -> END_CHUNKED_PATTERN
174 else -> NO_MATCHING_PATTERN
178 END_CHUNKED_PATTERN {
179 override fun evaluateChar(c: Char): NetconfMessageState {
180 return NO_MATCHING_PATTERN
184 override fun evaluateChar(c: Char): NetconfMessageState {
185 return NO_MATCHING_PATTERN
190 * Evaluate next transition state based on current state and the character read
191 * @param c character read in
192 * @return result of lookup of transition to the next {@link NetconfMessageState}
194 internal abstract fun evaluateChar(c: Char): NetconfMessageState
197 fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
198 log.info("$deviceInfo: Sending message with message-id: $messageId: message: \n $request")
199 val future = CompletableFuture<String>()
200 replies.put(messageId, future)
201 val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
204 outputStream.write(request)
206 } catch (e: IOException) {
207 log.error("$deviceInfo: Failed to send message : \n $request", e)
208 future.completeExceptionally(e)
215 private fun receivedMessage(deviceReply: String) {
216 if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR)
217 || deviceReply.contains(RpcMessageUtils.HELLO)) {
218 log.info("$deviceInfo: Received message with messageId: {} \n $deviceReply",
219 NetconfMessageUtils.getMsgId(deviceReply))
222 log.error("$deviceInfo: Invalid message received: \n $deviceReply")
224 sessionListener.accept(NetconfReceivedEvent(
225 NetconfReceivedEvent.Type.DEVICE_REPLY,
227 NetconfMessageUtils.getMsgId(deviceReply),
232 * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage}
233 * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl.
234 * @param fut {@link CompletableFuture} object
235 * @param timeout the maximum time to wait
236 * @param timeUnit the time unit of the timeout argument
237 * @return the result value
238 * @throws CancellationException if this future was cancelled
239 * @throws ExecutionException if this future completed exceptionally
240 * @throws InterruptedException if the current thread was interrupted while waiting
241 * @throws TimeoutException if the wait timed outStream
243 internal fun getFutureFromSendMessage(
244 fut: CompletableFuture<String>, timeout: Long, timeUnit: TimeUnit): String {
245 return fut.get(timeout, timeUnit)