Migrate "ms/controllerblueprints" from ccsdk/apps
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / netconf-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / netconf / executor / core / NetconfDeviceCommunicator.kt
1 /*
2  * Copyright © 2017-2019 AT&T, Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfReceivedEvent
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfSessionListener
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
24 import org.slf4j.LoggerFactory
25 import java.io.BufferedReader
26 import java.io.IOException
27 import java.io.InputStream
28 import java.io.InputStreamReader
29 import java.io.OutputStream
30 import java.io.OutputStreamWriter
31 import java.nio.charset.StandardCharsets
32 import java.util.concurrent.CompletableFuture
33
34 class NetconfDeviceCommunicator(private var inputStream: InputStream,
35                                 private var out: OutputStream,
36                                 private val deviceInfo: DeviceInfo,
37                                 private val sessionListener: NetconfSessionListener,
38                                 private var replies: MutableMap<String, CompletableFuture<String>>) : Thread() {
39
40     private val log = LoggerFactory.getLogger(NetconfDeviceCommunicator::class.java)
41     private var state = NetconfMessageState.NO_MATCHING_PATTERN
42
43     init {
44         start()
45     }
46
47     override fun run() {
48         var bufferReader: BufferedReader? = null
49         while (bufferReader == null) {
50             bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
51         }
52
53         try {
54             var socketClosed = false
55             val deviceReplyBuilder = StringBuilder()
56             while (!socketClosed) {
57                 val cInt = bufferReader.read()
58                 if (cInt == -1) {
59                     log.error("$deviceInfo: Received cInt = -1")
60 //                    bufferReader.close()
61                     socketClosed = true
62 //                    sessionListener.notify(NetconfReceivedEvent(
63 //                        NetconfReceivedEvent.Type.SESSION_CLOSED,
64 //                        deviceInfo = deviceInfo))
65                 }
66                 val c = cInt.toChar()
67                 state = state.evaluateChar(c)
68                 deviceReplyBuilder.append(c)
69                 if (state === NetconfMessageState.END_PATTERN) {
70                     var deviceReply = deviceReplyBuilder.toString()
71                     if (deviceReply == RpcMessageUtils.END_PATTERN) {
72                         socketClosed = true
73                         bufferReader.close()
74                         sessionListener.notify(NetconfReceivedEvent(
75                             NetconfReceivedEvent.Type.DEVICE_UNREGISTERED,
76                             deviceInfo = deviceInfo))
77                     } else {
78                         deviceReply = deviceReply.replace(RpcMessageUtils.END_PATTERN, "")
79                         receivedMessage(deviceReply)
80                         deviceReplyBuilder.setLength(0)
81                     }
82                 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
83                     var deviceReply = deviceReplyBuilder.toString()
84                     if (!NetconfMessageUtils.validateChunkedFraming(deviceReply)) {
85                         log.debug("$deviceInfo: Received badly framed message $deviceReply")
86                         socketClosed = true
87                         sessionListener.notify(NetconfReceivedEvent(
88                             NetconfReceivedEvent.Type.DEVICE_ERROR,
89                             deviceInfo = deviceInfo))
90                     } else {
91                         deviceReply = deviceReply.replace(RpcMessageUtils.MSGLEN_REGEX_PATTERN.toRegex(), "")
92                         deviceReply = deviceReply.replace(NetconfMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
93                         receivedMessage(deviceReply)
94                         deviceReplyBuilder.setLength(0)
95                     }
96                 }
97             }
98
99         } catch (e: IOException) {
100             log.warn("$deviceInfo: Fail while reading from channel", e)
101             sessionListener.notify(NetconfReceivedEvent(
102                 NetconfReceivedEvent.Type.DEVICE_ERROR,
103                 deviceInfo = deviceInfo))
104         }
105
106     }
107
108     private enum class NetconfMessageState {
109         NO_MATCHING_PATTERN {
110             override fun evaluateChar(c: Char): NetconfMessageState {
111                 return if (c == ']') {
112                     FIRST_BRACKET
113                 } else if (c == '\n') {
114                     FIRST_LF
115                 } else {
116                     this
117                 }
118             }
119         },
120         FIRST_BRACKET {
121             override fun evaluateChar(c: Char): NetconfMessageState {
122                 return if (c == ']') {
123                     SECOND_BRACKET
124                 } else {
125                     NO_MATCHING_PATTERN
126                 }
127             }
128         },
129         SECOND_BRACKET {
130             override fun evaluateChar(c: Char): NetconfMessageState {
131                 return if (c == '>') {
132                     FIRST_BIGGER
133                 } else {
134                     NO_MATCHING_PATTERN
135                 }
136             }
137         },
138         FIRST_BIGGER {
139             override fun evaluateChar(c: Char): NetconfMessageState {
140                 return if (c == ']') {
141                     THIRD_BRACKET
142                 } else {
143                     NO_MATCHING_PATTERN
144                 }
145             }
146         },
147         THIRD_BRACKET {
148             override fun evaluateChar(c: Char): NetconfMessageState {
149                 return if (c == ']') {
150                     ENDING_BIGGER
151                 } else {
152                     NO_MATCHING_PATTERN
153                 }
154             }
155         },
156         ENDING_BIGGER {
157             override fun evaluateChar(c: Char): NetconfMessageState {
158                 return if (c == '>') {
159                     END_PATTERN
160                 } else {
161                     NO_MATCHING_PATTERN
162                 }
163             }
164         },
165         FIRST_LF {
166             override fun evaluateChar(c: Char): NetconfMessageState {
167                 return if (c == '#') {
168                     FIRST_HASH
169                 } else if (c == ']') {
170                     FIRST_BRACKET
171                 } else if (c == '\n') {
172                     this
173                 } else {
174                     NO_MATCHING_PATTERN
175                 }
176             }
177         },
178         FIRST_HASH {
179             override fun evaluateChar(c: Char): NetconfMessageState {
180                 return if (c == '#') {
181                     SECOND_HASH
182                 } else {
183                     NO_MATCHING_PATTERN
184                 }
185             }
186         },
187         SECOND_HASH {
188             override fun evaluateChar(c: Char): NetconfMessageState {
189                 return if (c == '\n') {
190                     END_CHUNKED_PATTERN
191                 } else {
192                     NO_MATCHING_PATTERN
193                 }
194             }
195         },
196         END_CHUNKED_PATTERN {
197             override fun evaluateChar(c: Char): NetconfMessageState {
198                 return NO_MATCHING_PATTERN
199             }
200         },
201         END_PATTERN {
202             override fun evaluateChar(c: Char): NetconfMessageState {
203                 return NO_MATCHING_PATTERN
204             }
205         };
206
207         internal abstract fun evaluateChar(c: Char): NetconfMessageState
208     }
209
210     fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
211         log.info("$deviceInfo: Sending message: \n $request")
212         val future = CompletableFuture<String>()
213         replies.put(messageId, future)
214         val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
215         synchronized(this) {
216             try {
217                 outputStream.write(request)
218                 outputStream.flush()
219             } catch (e: IOException) {
220                 log.error("$deviceInfo: Failed to send message : \n $request", e)
221                 future.completeExceptionally(e)
222             }
223
224         }
225         return future
226     }
227
228     private fun receivedMessage(deviceReply: String) {
229         if (deviceReply.contains(RpcMessageUtils.RPC_REPLY) || deviceReply.contains(RpcMessageUtils.RPC_ERROR)
230             || deviceReply.contains(RpcMessageUtils.HELLO)) {
231             log.info("$deviceInfo: Received message with messageId: {}  \n $deviceReply",
232                 NetconfMessageUtils.getMsgId(deviceReply))
233
234         } else {
235             log.error("$deviceInfo: Invalid message received: \n $deviceReply")
236         }
237         sessionListener.notify(NetconfReceivedEvent(
238             NetconfReceivedEvent.Type.DEVICE_REPLY,
239             deviceReply,
240             NetconfMessageUtils.getMsgId(deviceReply),
241             deviceInfo))
242     }
243 }