cfcf24bb1db55f3334c401fbde134b2c178cc601
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2018 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.core
18
19 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.NetconfException
20 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.data.NetconfDeviceOutputEvent
21 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.DeviceInfo
22 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.interfaces.NetconfSessionDelegate
23 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcConstants
24 import org.onap.ccsdk.apps.blueprintsprocessor.functions.netconf.executor.utils.RpcMessageUtils
25 import org.slf4j.LoggerFactory
26 import java.io.*
27 import java.nio.charset.StandardCharsets
28 import java.util.concurrent.CompletableFuture
29
30
31 class NetconfStreamThread(private var inputStream: InputStream, private var out : OutputStream,
32                           private val netconfDeviceInfo: DeviceInfo, private val netconfSessionDelegate: NetconfSessionDelegate,
33                           private var replies :MutableMap<String, CompletableFuture<String>> ) : Thread() {
34
35     val log = LoggerFactory.getLogger(NetconfStreamThread::class.java)
36     lateinit var state : NetconfMessageState
37    // val outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
38    private var outputStream: OutputStreamWriter? = null
39
40     override fun run() {
41         var bufferReader: BufferedReader? = null
42         while (bufferReader == null) {
43             bufferReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
44         }
45
46         try {
47             var socketClosed = false
48             val deviceReplyBuilder = StringBuilder()
49             while (!socketClosed) {
50                 val cInt = bufferReader!!.read()
51                 if (cInt == -1) {
52                     log.debug("Netconf device {} sent error char in session will need to be reopend",
53                             netconfDeviceInfo)
54                     NetconfDeviceOutputEvent(NetconfDeviceOutputEvent.Type.SESSION_CLOSED, null!!, null!!,
55                             null !!, netconfDeviceInfo)
56                     socketClosed = true
57                     log.debug("Netconf device {} ERROR cInt == -1 socketClosed = true", netconfDeviceInfo)
58                 }
59                 val c = cInt.toChar()
60                 state = state.evaluateChar(c)
61                 deviceReplyBuilder.append(c)
62                 if (state === NetconfMessageState.END_PATTERN) {
63                     var deviceReply = deviceReplyBuilder.toString()
64                     if (deviceReply == RpcConstants.END_PATTERN) {
65                         socketClosed = true
66                         close(deviceReply)
67                     } else {
68                         deviceReply = deviceReply.replace(RpcConstants.END_PATTERN, "")
69                         dealWithReply(deviceReply)
70                         deviceReplyBuilder.setLength(0)
71                     }
72                 } else if (state === NetconfMessageState.END_CHUNKED_PATTERN) {
73                     var deviceReply = deviceReplyBuilder.toString()
74                     if (!RpcMessageUtils.validateChunkedFraming(deviceReply)) {
75                         log.debug("Netconf device {} send badly framed message {}", netconfDeviceInfo, deviceReply)
76                         socketClosed = true
77                         close(deviceReply)
78                     } else {
79                         deviceReply = deviceReply.replace(RpcConstants.MSGLEN_REGEX_PATTERN.toRegex(), "")
80                         deviceReply = deviceReply.replace(RpcMessageUtils.CHUNKED_END_REGEX_PATTERN.toRegex(), "")
81                         dealWithReply(deviceReply)
82                         deviceReplyBuilder.setLength(0)
83                     }
84                 }
85             }
86         } catch (e: IOException) {
87             log.warn("Error in reading from the session for device {} ", netconfDeviceInfo, e)
88             throw IllegalStateException(
89                     NetconfException(message = "Error in reading from the session for device {}$netconfDeviceInfo"))
90         }
91
92     }
93
94     enum class NetconfMessageState {
95
96         NO_MATCHING_PATTERN {
97             override fun evaluateChar(c: Char): NetconfMessageState {
98                 return if (c == ']') {
99                     FIRST_BRACKET
100                 } else if (c == '\n') {
101                     FIRST_LF
102                 } else {
103                     this
104                 }
105             }
106         },
107         FIRST_BRACKET {
108             override fun evaluateChar(c: Char): NetconfMessageState {
109                 return if (c == ']') {
110                     SECOND_BRACKET
111                 } else {
112                     NO_MATCHING_PATTERN
113                 }
114             }
115         },
116         SECOND_BRACKET {
117             override fun evaluateChar(c: Char): NetconfMessageState {
118                 return if (c == '>') {
119                     FIRST_BIGGER
120                 } else {
121                     NO_MATCHING_PATTERN
122                 }
123             }
124         },
125         FIRST_BIGGER {
126             override fun evaluateChar(c: Char): NetconfMessageState {
127                 return if (c == ']') {
128                     THIRD_BRACKET
129                 } else {
130                     NO_MATCHING_PATTERN
131                 }
132             }
133         },
134         THIRD_BRACKET {
135             override fun evaluateChar(c: Char): NetconfMessageState {
136                 return if (c == ']') {
137                     ENDING_BIGGER
138                 } else {
139                     NO_MATCHING_PATTERN
140                 }
141             }
142         },
143         ENDING_BIGGER {
144             override fun evaluateChar(c: Char): NetconfMessageState {
145                 return if (c == '>') {
146                     END_PATTERN
147                 } else {
148                     NO_MATCHING_PATTERN
149                 }
150             }
151         },
152         FIRST_LF {
153             override fun evaluateChar(c: Char): NetconfMessageState {
154                 return if (c == '#') {
155                     FIRST_HASH
156                 } else if (c == ']') {
157                     FIRST_BRACKET
158                 } else if (c == '\n') {
159                     this
160                 } else {
161                     NO_MATCHING_PATTERN
162                 }
163             }
164         },
165         FIRST_HASH {
166             override fun evaluateChar(c: Char): NetconfMessageState {
167                 return if (c == '#') {
168                     SECOND_HASH
169                 } else {
170                     NO_MATCHING_PATTERN
171                 }
172             }
173         },
174         SECOND_HASH {
175             override fun evaluateChar(c: Char): NetconfMessageState {
176                 return if (c == '\n') {
177                     END_CHUNKED_PATTERN
178                 } else {
179                     NO_MATCHING_PATTERN
180                 }
181             }
182         },
183         END_CHUNKED_PATTERN {
184             override fun evaluateChar(c: Char): NetconfMessageState {
185                 return NO_MATCHING_PATTERN
186             }
187         },
188         END_PATTERN {
189             override fun evaluateChar(c: Char): NetconfMessageState {
190                 return NO_MATCHING_PATTERN
191             }
192         };
193
194         internal abstract fun evaluateChar(c: Char): NetconfMessageState
195     }
196
197     private fun close(deviceReply: String) {
198         log.debug("Netconf device {} socketClosed = true DEVICE_UNREGISTERED {}", netconfDeviceInfo, deviceReply)
199         NetconfDeviceOutputEvent(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED, null!!, null!!, null!!,
200                 netconfDeviceInfo)
201         this.interrupt()
202     }
203
204     private fun dealWithReply(deviceReply: String) {
205         if (deviceReply.contains(RpcConstants.RPC_REPLY) || deviceReply.contains(RpcConstants.RPC_ERROR)
206                 || deviceReply.contains(RpcConstants.HELLO)) {
207             log.info("From Netconf Device: {} \n for Message-ID: {} \n Device-Reply: \n {} \n ", netconfDeviceInfo,
208                     RpcMessageUtils.getMsgId(deviceReply), deviceReply)
209             val event = NetconfDeviceOutputEvent(NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
210                     null!!, deviceReply, RpcMessageUtils.getMsgId(deviceReply), netconfDeviceInfo)
211             netconfSessionDelegate.notify(event)
212         } else {
213             log.debug("Error Reply: \n {} \n from Netconf Device: {}", deviceReply, netconfDeviceInfo)
214         }
215     }
216
217     @SuppressWarnings("squid:S3655")
218     @Override
219     fun sendMessage(request: String): CompletableFuture<String> {
220         val messageId = RpcMessageUtils.getMsgId(request)
221         return sendMessage(request, messageId.get())
222     }
223
224     fun sendMessage(request: String, messageId: String): CompletableFuture<String> {
225         log.info("Sending message: \n {} \n to NETCONF Device: {}", request, netconfDeviceInfo)
226         val cf = CompletableFuture<String>()
227         replies.put(messageId, cf)
228        // outputStream = OutputStreamWriter(out, StandardCharsets.UTF_8)
229         synchronized(OutputStreamWriter(out, StandardCharsets.UTF_8)) {
230             try {
231
232                 OutputStreamWriter(out, StandardCharsets.UTF_8).write(request)
233                 OutputStreamWriter(out, StandardCharsets.UTF_8).flush()
234             } catch (e: IOException) {
235                 log.error("Writing to NETCONF Device {} failed", netconfDeviceInfo, e)
236                 cf.completeExceptionally(e)
237             }
238
239         }
240         return cf
241     }
242
243 }