12e3b83da471f10c88504183884e4f5bd519721a
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.BufferedReader
26 import java.io.IOException
27 import java.io.InputStream
28 import java.io.InputStreamReader
29 import java.io.OutputStream
30 import java.io.OutputStreamWriter
31 import java.nio.charset.StandardCharsets
32 import java.util.concurrent.CompletableFuture
33
34 class NetconfDeviceCommunicator(private var inputStream: InputStream,
35                                 private var out: OutputStream,
36                                 private val deviceInfo: DeviceInfo,
37                                 private val sessionListener: NetconfSessionListener,
38                                 private var replies: MutableMap<String, CompletableFuture<String>>) : Thread() {
39
40     private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
41     private var state = NetconfMessageState.NO_MATCHING_PATTERN
42
43     init {
44         start()
45     }
46
47     override fun run() {
48         var bufferReader: BufferedReader? = null
49         while (bufferReader == null) {
50             bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
51         }
52
53         try {
54             var socketClosed = false
55             val deviceReplyBuilder = StringBuilder()
56             while (!socketClosed) {
57                 val cInt = bufferReader.read()
58                 if (cInt == -1) {
59                     log.error("$deviceInfo: Received cInt = -1")
60                     socketClosed = true
61                 }
62                 val c = cInt.toChar()
63                 state = state.evaluateChar(c)
64                 deviceReplyBuilder.append(c)
65                 if (state === NetconfMessageState.END_PATTERN) {
66                     var deviceReply = deviceReplyBuilder.toString()
67                     if (deviceReply == RpcMessageUtils.END_PATTERN) {
68                         socketClosed = true
69                         bufferReader.close()
70                         sessionListener.accept(NetconfReceivedEvent(
71                             NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
72                             deviceInfo = deviceInfo))
73                     } else {
74                         deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
75                         receivedMessage(deviceReply)
76                         deviceReplyBuilder.setLength(0)
77                     }
78                 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
79                     var deviceReply = deviceReplyBuilder.toString()
80                     if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
81                         log.debug("$deviceInfo: Received badly framed message $deviceReply")
82                         socketClosed = true
83                         sessionListener.accept(NetconfReceivedEvent(
84                             NetconfReceivedEvent.Type.DEVICE_ERROR,
85                             deviceInfo = deviceInfo))
86                     } else {
87                         deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
88                         deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
89                         receivedMessage(deviceReply)
90                         deviceReplyBuilder.setLength(0)
91                     }
92                 }
93             }
94
95         } catch (e: IOException) {
96             log.warn("$deviceInfo: Fail while reading from channel", e)
97             sessionListener.accept(NetconfReceivedEvent(
98                 NetconfReceivedEvent.Type.DEVICE_ERROR,
99                 deviceInfo = deviceInfo))
100         }
101
102     }
103
104     /**
105      * State machine for the Netconf message parser
106      */
107     internal enum class NetconfMessageState {
108         NO_MATCHING_PATTERN {
109             override fun evaluateChar(c: Char): NetconfMessageState {
110                 return when (c) {
111                     ']' -> FIRST_BRACKET
112                     '\n' -> FIRST_LF
113                     else -> this
114                 }
115             }
116         },
117         FIRST_BRACKET {
118             override fun evaluateChar(c: Char): NetconfMessageState {
119                 return when (c) {
120                     ']' -> SECOND_BRACKET
121                     else -> NO_MATCHING_PATTERN
122                 }
123             }
124         },
125         SECOND_BRACKET {
126             override fun evaluateChar(c: Char): NetconfMessageState {
127                 return when (c) {
128                     '>' -> FIRST_BIGGER
129                     else -> NO_MATCHING_PATTERN
130                 }
131             }
132         },
133         FIRST_BIGGER {
134             override fun evaluateChar(c: Char): NetconfMessageState {
135                 return when (c) {
136                     ']' -> THIRD_BRACKET
137                     else -> NO_MATCHING_PATTERN
138                 }
139             }
140         },
141         THIRD_BRACKET {
142             override fun evaluateChar(c: Char): NetconfMessageState {
143                 return when (c) {
144                     ']' -> ENDING_BIGGER
145                     else -> NO_MATCHING_PATTERN
146                 }
147             }
148         },
149         ENDING_BIGGER {
150             override fun evaluateChar(c: Char): NetconfMessageState {
151                 return when (c) {
152                     '>' -> END_PATTERN
153                     else -> NO_MATCHING_PATTERN
154                 }
155             }
156         },
157         FIRST_LF {
158             override fun evaluateChar(c: Char): NetconfMessageState {
159                 return when (c) {
160                     '#' -> FIRST_HASH
161                     ']' -> FIRST_BRACKET
162                     '\n' -> this
163                     else -> NO_MATCHING_PATTERN
164                 }
165             }
166         },
167         FIRST_HASH {
168             override fun evaluateChar(c: Char): NetconfMessageState {
169                 return when (c) {
170                     '#' -> SECOND_HASH
171                     else -> NO_MATCHING_PATTERN
172                 }
173             }
174         },
175         SECOND_HASH {
176             override fun evaluateChar(c: Char): NetconfMessageState {
177                 return when (c) {
178                     '\n' -> END_CHUNKED_PATTERN
179                     else -> NO_MATCHING_PATTERN
180                 }
181             }
182         },
183         END_CHUNKED_PATTERN {
184             override fun evaluateChar(c: Char): NetconfMessageState {
185                 return NO_MATCHING_PATTERN
186             }
187         },
188         END_PATTERN {
189             override fun evaluateChar(c: Char): NetconfMessageState {
190                 return NO_MATCHING_PATTERN
191             }
192         };
193
194         /**
195          * Evaluate next transition state based on current state and the character read
196          * @param c character read in
197          * @return result of lookup of transition to the next {@link NetconfMessageState}
198          */
199         internal abstract fun evaluateChar(c: Char): NetconfMessageState
200     }
201
202     fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
203         log.info("$deviceInfo: Sending message: \n $request")
204         val future = CompletableFuture<String>()
205         replies.put(messageId, future)
206         val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
207         synchronized(this) {
208             try {
209                 outputStream.write(request)
210                 outputStream.flush()
211             } catch (e: IOException) {
212                 log.error("$deviceInfo: Failed to send message : \n $request", e)
213                 future.completeExceptionally(e)
214             }
215
216         }
217         return future
218     }
219
220     private fun receivedMessage(deviceReply: String) {
221         if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR)
222             || deviceReply.contains(RpcMessageUtils.HELLO)) {
223             log.info("$deviceInfo: Received message with messageId: {}  \n $deviceReply",
224                 NetconfMessageUtils.getMsgId(deviceReply))
225
226         } else {
227             log.error("$deviceInfo: Invalid message received: \n $deviceReply")
228         }
229         sessionListener.accept(NetconfReceivedEvent(
230             NetconfReceivedEvent.Type.DEVICE_REPLY,
231             deviceReply,
232             NetconfMessageUtils.getMsgId(deviceReply),
233             deviceInfo))
234     }
235 }