Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / nats-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / nats / service / BluePrintNatsServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
18
19 import io.mockk.every
20 import io.mockk.mockk
21 import io.mockk.spyk
22 import io.nats.streaming.MessageHandler
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.runBlocking
25 import org.junit.Test
26 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
27 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
28 import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
29 import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
30 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
32 import kotlin.test.assertNotNull
33
34 class BluePrintNatsServiceTest {
35
36     @Test
37     fun testTokenAuthNatService() {
38         val configuration = """{
39                 "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}",
40                 "host" : "nats://localhost:4222",
41                 "token" : "tokenAuth"
42             }            
43         """.trimIndent()
44
45         val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
46
47         val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
48         every {
49             spkBluePrintNatsLibPropertyService
50                 .bluePrintNatsService(any<NatsConnectionProperties>())
51         } returns TokenAuthNatsService(
52             mockk()
53         )
54
55         val bluePrintNatsService =
56             spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
57         assertNotNull(bluePrintNatsService, "failed to get NATS Service")
58     }
59
60     @Test
61     fun testTLSAuthNatService() {
62         val configuration = """{
63                 "type" : "${NatsLibConstants.TYPE_TLS_AUTH}",
64                 "host" : "nats://localhost:4222"
65             }            
66         """.trimIndent()
67
68         val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
69
70         val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
71         every {
72             spkBluePrintNatsLibPropertyService
73                 .bluePrintNatsService(any<NatsConnectionProperties>())
74         } returns TLSAuthNatsService(
75             mockk()
76         )
77
78         val bluePrintNatsService =
79             spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
80         assertNotNull(bluePrintNatsService, "failed to get NATS Service")
81     }
82
83     /** Enable to test only on local desktop. Don't enable in Build server
84      * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
85      */
86     // @Test
87     fun localIntegrationTest() {
88         runBlocking {
89
90             val connectionProperties = TokenAuthNatsConnectionProperties().apply {
91                 host = "nats://localhost:4222,nats://localhost:4223"
92                 clientId = "client-1"
93                 token = "tokenAuth"
94             }
95             val natsService = TokenAuthNatsService(connectionProperties)
96             val streamingConnection = natsService.connection()
97             assertNotNull(streamingConnection, "failed to create nats connection")
98
99             val connectionProperties2 = TokenAuthNatsConnectionProperties().apply {
100                 host = "nats://localhost:4222,nats://localhost:4223"
101                 clientId = "client-2"
102                 token = "tokenAuth"
103             }
104             val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2)
105             val streamingConnection2 = tlsAuthNatsService2.connection()
106             assertNotNull(streamingConnection2, "failed to create nats connection 2")
107
108             testMultiPublish(natsService)
109             testLoadBalance(natsService)
110             testLimitSubscription(natsService)
111             testRequestReply(natsService)
112             testMultiRequestReply(natsService)
113             delay(1000)
114         }
115     }
116
117     private fun testMultiPublish(natsService: BluePrintNatsService) {
118         runBlocking {
119             /** Multiple Publish Message Test **/
120             val messageHandler1 =
121                 MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") }
122             val messageHandler2 =
123                 MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") }
124
125             natsService.subscribe("multi-publish", messageHandler1)
126             natsService.subscribe("multi-publish", messageHandler2)
127
128             repeat(5) {
129                 natsService.publish("multi-publish", "multi publish message-$it".toByteArray())
130             }
131         }
132     }
133
134     private fun testLoadBalance(natsService: BluePrintNatsService) {
135         runBlocking {
136             /** Load balance Publish Message Test **/
137             val lbMessageHandler1 =
138                 MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") }
139             val lbMessageHandler2 =
140                 MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
141
142             val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
143             val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
144
145             repeat(5) {
146                 natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
147             }
148             sub1.unsubscribe()
149             sub2.unsubscribe()
150         }
151     }
152
153     private fun testLimitSubscription(natsService: BluePrintNatsService) {
154         runBlocking {
155             /** Load balance Publish Message Test **/
156             val lbMessageHandler1 =
157                 MessageHandler { message ->
158                     runBlocking {
159                         println("LB Publish Message Handler 1: ${message.strData()}")
160                         message.ack()
161                     }
162                 }
163             val lbMessageHandler2 =
164                 MessageHandler { message ->
165                     runBlocking {
166                         println("LB Publish Message Handler 2: ${message.strData()}")
167                         message.ack()
168                     }
169                 }
170
171             val sub1 = natsService.loadBalanceSubscribe(
172                 "lb-publish", "lb-group", lbMessageHandler1,
173                 SubscriptionOptionsUtils.manualAckWithRateLimit(1)
174             )
175             val sub2 = natsService.loadBalanceSubscribe(
176                 "lb-publish", "lb-group", lbMessageHandler2,
177                 SubscriptionOptionsUtils.manualAckWithRateLimit(1)
178             )
179
180             repeat(10) {
181                 natsService.publish("lb-publish", "lb limit message-$it".toByteArray())
182             }
183             sub1.unsubscribe()
184             sub2.unsubscribe()
185         }
186     }
187
188     private fun testRequestReply(natsService: BluePrintNatsService) {
189         runBlocking {
190             val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
191                 println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
192                 message.connection.publish(
193                     message.replyTo,
194                     "Notification ${String(message.data)} reply from 1".toByteArray()
195                 )
196             }
197
198             val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
199                 println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})")
200                 message.connection.publish(
201                     message.replyTo,
202                     "Notification ${String(message.data)} reply from 2".toByteArray()
203                 )
204             }
205
206             natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
207             natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
208
209             repeat(5) {
210                 val message = natsService.requestAndGetOneReply(
211                     "rr-request",
212                     "rr message-$it".toByteArray(),
213                     1000
214                 )
215                 println("Received : ${message.strData()}")
216             }
217         }
218     }
219
220     private fun testMultiRequestReply(natsService: BluePrintNatsService) {
221         runBlocking {
222             /** Request Reply **/
223             val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
224                 println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
225                 message.connection.publish(
226                     message.replyTo,
227                     "Notification ${message.strData()} reply from 1".toByteArray()
228                 )
229                 message.connection.publish(
230                     message.replyTo,
231                     "Completion ${message.strData()} reply from 1".toByteArray()
232                 )
233             }
234             val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
235                 println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})")
236                 message.connection.publish(
237                     message.replyTo,
238                     "Notification ${message.strData()} reply from 2".toByteArray()
239                 )
240                 message.connection.publish(
241                     message.replyTo,
242                     "Completion ${message.strData()} reply from 2".toByteArray()
243                 )
244             }
245
246             natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
247             natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
248
249             /** Should unsubscribe on completion message */
250             val rrReplyMessageHandler = io.nats.client.MessageHandler { message ->
251                 val messageContent = message.strData()
252                 println("RR Reply Handler : $messageContent")
253                 if (messageContent.startsWith("Completion")) {
254                     message.subscription.unsubscribe()
255                 }
256             }
257             repeat(5) {
258                 natsService.requestAndGetMultipleReplies(
259                     "rr-request",
260                     "rr-reply-$it",
261                     "rr message-$it".toByteArray(),
262                     rrReplyMessageHandler
263                 )
264             }
265         }
266     }
267 }