06a71cad3235279124272ac8561d48e54df70efd
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.*
26 import java.nio.charset.*
27 import java.util.concurrent.*
28
29 class NetconfDeviceCommunicator(private var inputStream: InputStream,
30                                 private var out: OutputStream,
31                                 private val deviceInfo: DeviceInfo,
32                                 private val sessionListener: NetconfSessionListener,
33                                 private var replies: MutableMap<String, CompletableFuture<String>>) : Thread() {
34
35     private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
36     private var state = NetconfMessageState.NO_MATCHING_PATTERN
37
38     init {
39         start()
40     }
41
42     override fun run() {
43         var bufferReader: BufferedReader? = null
44         while (bufferReader == null) {
45             bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
46         }
47
48         try {
49             var socketClosed = false
50             val deviceReplyBuilder = StringBuilder()
51             while (!socketClosed) {
52                 val cInt = bufferReader.read()
53                 if (cInt == -1) {
54                     log.debug("$deviceInfo: Received end of stream, closing socket.")
55                     socketClosed = true
56                 }
57                 val c = cInt.toChar()
58                 state = state.evaluateChar(c)
59                 deviceReplyBuilder.append(c)
60                 if (state === NetconfMessageState.END_PATTERN) {
61                     var deviceReply = deviceReplyBuilder.toString()
62                     if (deviceReply == RpcMessageUtils.END_PATTERN) {
63                         socketClosed = true
64                         bufferReader.close()
65                         sessionListener.accept(NetconfReceivedEvent(
66                             NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
67                             deviceInfo = deviceInfo))
68                     } else {
69                         deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
70                         receivedMessage(deviceReply)
71                         deviceReplyBuilder.setLength(0)
72                     }
73                 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
74                     var deviceReply = deviceReplyBuilder.toString()
75                     if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
76                         log.debug("$deviceInfo: Received badly framed message $deviceReply")
77                         socketClosed = true
78                         sessionListener.accept(NetconfReceivedEvent(
79                             NetconfReceivedEvent.Type.DEVICE_ERROR,
80                             deviceInfo = deviceInfo))
81                     } else {
82                         deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
83                         deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
84                         receivedMessage(deviceReply)
85                         deviceReplyBuilder.setLength(0)
86                     }
87                 }
88             }
89
90         } catch (e: IOException) {
91             log.warn("$deviceInfo: Fail while reading from channel", e)
92             sessionListener.accept(NetconfReceivedEvent(
93                 NetconfReceivedEvent.Type.DEVICE_ERROR,
94                 deviceInfo = deviceInfo))
95         }
96
97     }
98
99     /**
100      * State machine for the Netconf message parser
101      */
102     internal enum class NetconfMessageState {
103         NO_MATCHING_PATTERN {
104             override fun evaluateChar(c: Char): NetconfMessageState {
105                 return when (c) {
106                     ']' -> FIRST_BRACKET
107                     '\n' -> FIRST_LF
108                     else -> this
109                 }
110             }
111         },
112         FIRST_BRACKET {
113             override fun evaluateChar(c: Char): NetconfMessageState {
114                 return when (c) {
115                     ']' -> SECOND_BRACKET
116                     else -> NO_MATCHING_PATTERN
117                 }
118             }
119         },
120         SECOND_BRACKET {
121             override fun evaluateChar(c: Char): NetconfMessageState {
122                 return when (c) {
123                     '>' -> FIRST_BIGGER
124                     else -> NO_MATCHING_PATTERN
125                 }
126             }
127         },
128         FIRST_BIGGER {
129             override fun evaluateChar(c: Char): NetconfMessageState {
130                 return when (c) {
131                     ']' -> THIRD_BRACKET
132                     else -> NO_MATCHING_PATTERN
133                 }
134             }
135         },
136         THIRD_BRACKET {
137             override fun evaluateChar(c: Char): NetconfMessageState {
138                 return when (c) {
139                     ']' -> ENDING_BIGGER
140                     else -> NO_MATCHING_PATTERN
141                 }
142             }
143         },
144         ENDING_BIGGER {
145             override fun evaluateChar(c: Char): NetconfMessageState {
146                 return when (c) {
147                     '>' -> END_PATTERN
148                     else -> NO_MATCHING_PATTERN
149                 }
150             }
151         },
152         FIRST_LF {
153             override fun evaluateChar(c: Char): NetconfMessageState {
154                 return when (c) {
155                     '#' -> FIRST_HASH
156                     ']' -> FIRST_BRACKET
157                     '\n' -> this
158                     else -> NO_MATCHING_PATTERN
159                 }
160             }
161         },
162         FIRST_HASH {
163             override fun evaluateChar(c: Char): NetconfMessageState {
164                 return when (c) {
165                     '#' -> SECOND_HASH
166                     else -> NO_MATCHING_PATTERN
167                 }
168             }
169         },
170         SECOND_HASH {
171             override fun evaluateChar(c: Char): NetconfMessageState {
172                 return when (c) {
173                     '\n' -> END_CHUNKED_PATTERN
174                     else -> NO_MATCHING_PATTERN
175                 }
176             }
177         },
178         END_CHUNKED_PATTERN {
179             override fun evaluateChar(c: Char): NetconfMessageState {
180                 return NO_MATCHING_PATTERN
181             }
182         },
183         END_PATTERN {
184             override fun evaluateChar(c: Char): NetconfMessageState {
185                 return NO_MATCHING_PATTERN
186             }
187         };
188
189         /**
190          * Evaluate next transition state based on current state and the character read
191          * @param c character read in
192          * @return result of lookup of transition to the next {@link NetconfMessageState}
193          */
194         internal abstract fun evaluateChar(c: Char): NetconfMessageState
195     }
196
197     fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
198         log.info("$deviceInfo: Sending message with message-id: $messageId: message: \n $request")
199         val future = CompletableFuture<String>()
200         replies.put(messageId, future)
201         val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
202         synchronized(this) {
203             try {
204                 outputStream.write(request)
205                 outputStream.flush()
206             } catch (e: IOException) {
207                 log.error("$deviceInfo: Failed to send message : \n $request", e)
208                 future.completeExceptionally(e)
209             }
210
211         }
212         return future
213     }
214
215     private fun receivedMessage(deviceReply: String) {
216         if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR)
217             || deviceReply.contains(RpcMessageUtils.HELLO)) {
218             log.info("$deviceInfo: Received message with messageId: {}  \n $deviceReply",
219                 NetconfMessageUtils.getMsgId(deviceReply))
220
221         } else {
222             log.error("$deviceInfo: Invalid message received: \n $deviceReply")
223         }
224         sessionListener.accept(NetconfReceivedEvent(
225             NetconfReceivedEvent.Type.DEVICE_REPLY,
226             deviceReply,
227             NetconfMessageUtils.getMsgId(deviceReply),
228             deviceInfo))
229     }
230
231     /**
232      * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage}
233      * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl.
234      * @param fut {@link CompletableFuture} object
235      * @param timeout the maximum time to wait
236      * @param timeUnit the time unit of the timeout argument
237      * @return the result value
238      * @throws CancellationException if this future was cancelled
239      * @throws ExecutionException if this future completed exceptionally
240      * @throws InterruptedException if the current thread was interrupted while waiting
241      * @throws TimeoutException if the wait timed outStream
242      */
243     internal fun getFutureFromSendMessage(
244         fut: CompletableFuture<String>, timeout: Long, timeUnit: TimeUnit): String {
245         return fut.get(timeout, timeUnit)
246     }
247 }