aa156e2a873e6cdd3958c2bf186ab25350848832
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.BufferedReader
26 import java.io.IOException
27 import java.io.InputStream
28 import java.io.InputStreamReader
29 import java.io.OutputStream
30 import java.io.OutputStreamWriter
31 import java.nio.charset.StandardCharsets
32 import java.util.concurrent.CompletableFuture
33 import java.util.concurrent.TimeUnit
34
35 class NetconfDeviceCommunicator(private var inputStream: InputStream,
36                                 private var out: OutputStream,
37                                 private val deviceInfo: DeviceInfo,
38                                 private val sessionListener: NetconfSessionListener,
39                                 private var replies: MutableMap<String, CompletableFuture<String>>) : Thread() {
40
41     private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
42     private var state = NetconfMessageState.NO_MATCHING_PATTERN
43
44     init {
45         start()
46     }
47
48     override fun run() {
49         var bufferReader: BufferedReader? = null
50         while (bufferReader == null) {
51             bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
52         }
53
54         try {
55             var socketClosed = false
56             val deviceReplyBuilder = StringBuilder()
57             while (!socketClosed) {
58                 val cInt = bufferReader.read()
59                 if (cInt == -1) {
60                     log.debug("$deviceInfo: Received end of stream, closing socket.")
61                     socketClosed = true
62                 }
63                 val c = cInt.toChar()
64                 state = state.evaluateChar(c)
65                 deviceReplyBuilder.append(c)
66                 if (state === NetconfMessageState.END_PATTERN) {
67                     var deviceReply = deviceReplyBuilder.toString()
68                     if (deviceReply == RpcMessageUtils.END_PATTERN) {
69                         socketClosed = true
70                         bufferReader.close()
71                         sessionListener.accept(NetconfReceivedEvent(
72                             NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
73                             deviceInfo = deviceInfo))
74                     } else {
75                         deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
76                         receivedMessage(deviceReply)
77                         deviceReplyBuilder.setLength(0)
78                     }
79                 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
80                     var deviceReply = deviceReplyBuilder.toString()
81                     if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
82                         log.debug("$deviceInfo: Received badly framed message $deviceReply")
83                         socketClosed = true
84                         sessionListener.accept(NetconfReceivedEvent(
85                             NetconfReceivedEvent.Type.DEVICE_ERROR,
86                             deviceInfo = deviceInfo))
87                     } else {
88                         deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
89                         deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
90                         receivedMessage(deviceReply)
91                         deviceReplyBuilder.setLength(0)
92                     }
93                 }
94             }
95
96         } catch (e: IOException) {
97             log.warn("$deviceInfo: Fail while reading from channel", e)
98             sessionListener.accept(NetconfReceivedEvent(
99                 NetconfReceivedEvent.Type.DEVICE_ERROR,
100                 deviceInfo = deviceInfo))
101         }
102
103     }
104
105     /**
106      * State machine for the Netconf message parser
107      */
108     internal enum class NetconfMessageState {
109         NO_MATCHING_PATTERN {
110             override fun evaluateChar(c: Char): NetconfMessageState {
111                 return when (c) {
112                     ']' -> FIRST_BRACKET
113                     '\n' -> FIRST_LF
114                     else -> this
115                 }
116             }
117         },
118         FIRST_BRACKET {
119             override fun evaluateChar(c: Char): NetconfMessageState {
120                 return when (c) {
121                     ']' -> SECOND_BRACKET
122                     else -> NO_MATCHING_PATTERN
123                 }
124             }
125         },
126         SECOND_BRACKET {
127             override fun evaluateChar(c: Char): NetconfMessageState {
128                 return when (c) {
129                     '>' -> FIRST_BIGGER
130                     else -> NO_MATCHING_PATTERN
131                 }
132             }
133         },
134         FIRST_BIGGER {
135             override fun evaluateChar(c: Char): NetconfMessageState {
136                 return when (c) {
137                     ']' -> THIRD_BRACKET
138                     else -> NO_MATCHING_PATTERN
139                 }
140             }
141         },
142         THIRD_BRACKET {
143             override fun evaluateChar(c: Char): NetconfMessageState {
144                 return when (c) {
145                     ']' -> ENDING_BIGGER
146                     else -> NO_MATCHING_PATTERN
147                 }
148             }
149         },
150         ENDING_BIGGER {
151             override fun evaluateChar(c: Char): NetconfMessageState {
152                 return when (c) {
153                     '>' -> END_PATTERN
154                     else -> NO_MATCHING_PATTERN
155                 }
156             }
157         },
158         FIRST_LF {
159             override fun evaluateChar(c: Char): NetconfMessageState {
160                 return when (c) {
161                     '#' -> FIRST_HASH
162                     ']' -> FIRST_BRACKET
163                     '\n' -> this
164                     else -> NO_MATCHING_PATTERN
165                 }
166             }
167         },
168         FIRST_HASH {
169             override fun evaluateChar(c: Char): NetconfMessageState {
170                 return when (c) {
171                     '#' -> SECOND_HASH
172                     else -> NO_MATCHING_PATTERN
173                 }
174             }
175         },
176         SECOND_HASH {
177             override fun evaluateChar(c: Char): NetconfMessageState {
178                 return when (c) {
179                     '\n' -> END_CHUNKED_PATTERN
180                     else -> NO_MATCHING_PATTERN
181                 }
182             }
183         },
184         END_CHUNKED_PATTERN {
185             override fun evaluateChar(c: Char): NetconfMessageState {
186                 return NO_MATCHING_PATTERN
187             }
188         },
189         END_PATTERN {
190             override fun evaluateChar(c: Char): NetconfMessageState {
191                 return NO_MATCHING_PATTERN
192             }
193         };
194
195         /**
196          * Evaluate next transition state based on current state and the character read
197          * @param c character read in
198          * @return result of lookup of transition to the next {@link NetconfMessageState}
199          */
200         internal abstract fun evaluateChar(c: Char): NetconfMessageState
201     }
202
203     fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
204         log.info("$deviceInfo: Sending message: \n $request")
205         val future = CompletableFuture<String>()
206         replies.put(messageId, future)
207         val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
208         synchronized(this) {
209             try {
210                 outputStream.write(request)
211                 outputStream.flush()
212             } catch (e: IOException) {
213                 log.error("$deviceInfo: Failed to send message : \n $request", e)
214                 future.completeExceptionally(e)
215             }
216
217         }
218         return future
219     }
220
221     private fun receivedMessage(deviceReply: String) {
222         if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR)
223             || deviceReply.contains(RpcMessageUtils.HELLO)) {
224             log.info("$deviceInfo: Received message with messageId: {}  \n $deviceReply",
225                 NetconfMessageUtils.getMsgId(deviceReply))
226
227         } else {
228             log.error("$deviceInfo: Invalid message received: \n $deviceReply")
229         }
230         sessionListener.accept(NetconfReceivedEvent(
231             NetconfReceivedEvent.Type.DEVICE_REPLY,
232             deviceReply,
233             NetconfMessageUtils.getMsgId(deviceReply),
234             deviceInfo))
235     }
236
237     /**
238      * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage}
239      * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl.
240      * @param fut {@link CompletableFuture} object
241      * @param timeout the maximum time to wait
242      * @param timeUnit the time unit of the timeout argument
243      * @return the result value
244      * @throws CancellationException if this future was cancelled
245      * @throws ExecutionException if this future completed exceptionally
246      * @throws InterruptedException if the current thread was interrupted while waiting
247      * @throws TimeoutException if the wait timed outStream
248      */
249     internal fun getFutureFromSendMessage(
250         fut: CompletableFuture<String>, timeout: Long, timeUnit: TimeUnit): String {
251         return fut.get(timeout, timeUnit)
252     }
253 }