8f8fc1463d55fbb2248a5ea263c5adb1786f4d3c
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.BufferedReader
26 import java.io.IOException
27 import java.io.InputStream
28 import java.io.InputStreamReader
29 import java.io.OutputStream
30 import java.io.OutputStreamWriter
31 import java.nio.charset.StandardCharsets
32 import java.util.concurrent.CancellationException
33 import java.util.concurrent.CompletableFuture
34 import java.util.concurrent.ExecutionException
35 import java.util.concurrent.TimeUnit
36 import java.util.concurrent.TimeoutException
37
38 class NetconfDeviceCommunicator(
39     private var inputStream: InputStream,
40     private var out: OutputStream,
41     private val deviceInfo: DeviceInfo,
42     private val sessionListener: NetconfSessionListener,
43     private var replies: MutableMap<String, CompletableFuture<String>>
44 ) : Thread() {
45
46     private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
47     private var state = NetconfMessageState.NO_MATCHING_PATTERN
48
49     init {
50         start()
51     }
52
53     override fun run() {
54         var bufferReader: BufferedReader? = null
55         while (bufferReader == null) {
56             bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
57         }
58
59         try {
60             var socketClosed = false
61             val deviceReplyBuilder = StringBuilder()
62             while (!socketClosed) {
63                 val cInt = bufferReader.read()
64                 if (cInt == -1) {
65                     log.debug("$deviceInfo: Received end of stream, closing socket.")
66                     socketClosed = true
67                 }
68                 val c = cInt.toChar()
69                 state = state.evaluateChar(c)
70                 deviceReplyBuilder.append(c)
71                 if (state === NetconfMessageState.END_PATTERN) {
72                     var deviceReply = deviceReplyBuilder.toString()
73                     if (deviceReply == RpcMessageUtils.END_PATTERN) {
74                         socketClosed = true
75                         bufferReader.close()
76                         sessionListener.accept(
77                             NetconfReceivedEvent(
78                                 NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
79                                 deviceInfo = deviceInfo
80                             )
81                         )
82                     } else {
83                         deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
84                         receivedMessage(deviceReply)
85                         deviceReplyBuilder.setLength(0)
86                     }
87                 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
88                     var deviceReply = deviceReplyBuilder.toString()
89                     if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
90                         log.debug("$deviceInfo: Received badly framed message $deviceReply")
91                         socketClosed = true
92                         sessionListener.accept(
93                             NetconfReceivedEvent(
94                                 NetconfReceivedEvent.Type.DEVICE_ERROR,
95                                 deviceInfo = deviceInfo
96                             )
97                         )
98                     } else {
99                         deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
100                         deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
101                         receivedMessage(deviceReply)
102                         deviceReplyBuilder.setLength(0)
103                     }
104                 }
105             }
106         } catch (e: IOException) {
107             log.warn("$deviceInfo: Fail while reading from channel", e)
108             sessionListener.accept(
109                 NetconfReceivedEvent(
110                     NetconfReceivedEvent.Type.DEVICE_ERROR,
111                     deviceInfo = deviceInfo
112                 )
113             )
114         }
115     }
116
117     /**
118      * State machine for the Netconf message parser
119      */
120     internal enum class NetconfMessageState {
121
122         NO_MATCHING_PATTERN {
123             override fun evaluateChar(c: Char): NetconfMessageState {
124                 return when (c) {
125                     ']' -> FIRST_BRACKET
126                     '\n' -> FIRST_LF
127                     else -> this
128                 }
129             }
130         },
131         FIRST_BRACKET {
132             override fun evaluateChar(c: Char): NetconfMessageState {
133                 return when (c) {
134                     ']' -> SECOND_BRACKET
135                     else -> NO_MATCHING_PATTERN
136                 }
137             }
138         },
139         SECOND_BRACKET {
140             override fun evaluateChar(c: Char): NetconfMessageState {
141                 return when (c) {
142                     '>' -> FIRST_BIGGER
143                     else -> NO_MATCHING_PATTERN
144                 }
145             }
146         },
147         FIRST_BIGGER {
148             override fun evaluateChar(c: Char): NetconfMessageState {
149                 return when (c) {
150                     ']' -> THIRD_BRACKET
151                     else -> NO_MATCHING_PATTERN
152                 }
153             }
154         },
155         THIRD_BRACKET {
156             override fun evaluateChar(c: Char): NetconfMessageState {
157                 return when (c) {
158                     ']' -> ENDING_BIGGER
159                     else -> NO_MATCHING_PATTERN
160                 }
161             }
162         },
163         ENDING_BIGGER {
164             override fun evaluateChar(c: Char): NetconfMessageState {
165                 return when (c) {
166                     '>' -> END_PATTERN
167                     else -> NO_MATCHING_PATTERN
168                 }
169             }
170         },
171         FIRST_LF {
172             override fun evaluateChar(c: Char): NetconfMessageState {
173                 return when (c) {
174                     '#' -> FIRST_HASH
175                     ']' -> FIRST_BRACKET
176                     '\n' -> this
177                     else -> NO_MATCHING_PATTERN
178                 }
179             }
180         },
181         FIRST_HASH {
182             override fun evaluateChar(c: Char): NetconfMessageState {
183                 return when (c) {
184                     '#' -> SECOND_HASH
185                     else -> NO_MATCHING_PATTERN
186                 }
187             }
188         },
189         SECOND_HASH {
190             override fun evaluateChar(c: Char): NetconfMessageState {
191                 return when (c) {
192                     '\n' -> END_CHUNKED_PATTERN
193                     else -> NO_MATCHING_PATTERN
194                 }
195             }
196         },
197         END_CHUNKED_PATTERN {
198             override fun evaluateChar(c: Char): NetconfMessageState {
199                 return NO_MATCHING_PATTERN
200             }
201         },
202         END_PATTERN {
203             override fun evaluateChar(c: Char): NetconfMessageState {
204                 return NO_MATCHING_PATTERN
205             }
206         };
207
208         /**
209          * Evaluate next transition state based on current state and the character read
210          * @param c character read in
211          * @return result of lookup of transition to the next {@link NetconfMessageState}
212          */
213         internal abstract fun evaluateChar(c: Char): NetconfMessageState
214     }
215
216     fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
217         log.info("$deviceInfo: Sending message with message-id: $messageId: message: \n $request")
218         val future = CompletableFuture<String>()
219         replies.put(messageId, future)
220         val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
221         synchronized(this) {
222             try {
223                 outputStream.write(request)
224                 outputStream.flush()
225             } catch (e: IOException) {
226                 log.error("$deviceInfo: Failed to send message : \n $request", e)
227                 future.completeExceptionally(e)
228             }
229         }
230         return future
231     }
232
233     private fun receivedMessage(deviceReply: String) {
234         if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR) ||
235             deviceReply.contains(RpcMessageUtils.HELLO)
236         ) {
237             log.info(
238                 "$deviceInfo: Received message with messageId: {}  \n $deviceReply",
239                 NetconfMessageUtils.getMsgId(deviceReply)
240             )
241         } else {
242             log.error("$deviceInfo: Invalid message received: \n $deviceReply")
243         }
244         sessionListener.accept(
245             NetconfReceivedEvent(
246                 NetconfReceivedEvent.Type.DEVICE_REPLY,
247                 deviceReply,
248                 NetconfMessageUtils.getMsgId(deviceReply),
249                 deviceInfo
250             )
251         )
252     }
253
254     /**
255      * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage}
256      * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl.
257      * @param fut {@link CompletableFuture} object
258      * @param timeout the maximum time to wait
259      * @param timeUnit the time unit of the timeout argument
260      * @return the result value
261      * @throws CancellationException if this future was cancelled
262      * @throws ExecutionException if this future completed exceptionally
263      * @throws InterruptedException if the current thread was interrupted while waiting
264      * @throws TimeoutException if the wait timed outStream
265      */
266     internal fun getFutureFromSendMessage(
267         fut: CompletableFuture<String>,
268         timeout: Long,
269         timeUnit: TimeUnit
270     ): String {
271         return fut.get(timeout, timeUnit)
272     }
273 }