Cluster communication channels
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / nats-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / nats / service / BluePrintNatsServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
18
19 import io.mockk.every
20 import io.mockk.mockk
21 import io.mockk.spyk
22 import io.nats.streaming.MessageHandler
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.runBlocking
25 import org.junit.Test
26 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
27 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
28 import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
29 import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
30 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
31 import kotlin.test.assertNotNull
32
33 class BluePrintNatsServiceTest {
34
35     @Test
36     fun testTokenAuthNatService() {
37         val configuration = """{
38                 "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}",
39                 "host" : "nats://localhost:4222",
40                 "token" : "tokenAuth"
41             }            
42         """.trimIndent()
43
44         val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
45
46         val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
47         every {
48             spkBluePrintNatsLibPropertyService
49                 .bluePrintNatsService(any<NatsConnectionProperties>())
50         } returns TokenAuthNatsService(
51             mockk()
52         )
53
54         val bluePrintNatsService =
55             spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
56         assertNotNull(bluePrintNatsService, "failed to get NATS Service")
57     }
58
59     @Test
60     fun testTLSAuthNatService() {
61         val configuration = """{
62                 "type" : "${NatsLibConstants.TYPE_TLS_AUTH}",
63                 "host" : "nats://localhost:4222"
64             }            
65         """.trimIndent()
66
67         val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
68
69         val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
70         every {
71             spkBluePrintNatsLibPropertyService
72                 .bluePrintNatsService(any<NatsConnectionProperties>())
73         } returns TLSAuthNatsService(
74             mockk()
75         )
76
77         val bluePrintNatsService =
78             spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
79         assertNotNull(bluePrintNatsService, "failed to get NATS Service")
80     }
81
82     /** Enable to test only on local desktop. Don't enable in Build server
83      * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
84      */
85     // @Test
86     fun localTntegrationTest() {
87         runBlocking {
88
89             val connectionProperties = TokenAuthNatsConnectionProperties().apply {
90                 host = "nats://localhost:4222,nats://localhost:4223"
91                 clientId = "client-1"
92                 token = "tokenAuth"
93             }
94             val natsService = TokenAuthNatsService(connectionProperties)
95             val streamingConnection = natsService.connection()
96             assertNotNull(streamingConnection, "failed to create nats connection")
97
98             val connectionProperties2 = TokenAuthNatsConnectionProperties().apply {
99                 host = "nats://localhost:4222,nats://localhost:4223"
100                 clientId = "client-2"
101                 token = "tokenAuth"
102             }
103             val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2)
104             val streamingConnection2 = tlsAuthNatsService2.connection()
105             assertNotNull(streamingConnection2, "failed to create nats connection 2")
106
107             testMultiPublish(natsService)
108             testLoadBalance(natsService)
109             testRequestReply(natsService)
110             testMultiRequestReply(natsService)
111             delay(1000)
112         }
113     }
114
115     private fun testMultiPublish(natsService: BluePrintNatsService) {
116         runBlocking {
117             /** Multiple Publish Message Test **/
118             val messageHandler1 =
119                 MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") }
120             val messageHandler2 =
121                 MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") }
122
123             natsService.subscribe("multi-publish", messageHandler1)
124             natsService.subscribe("multi-publish", messageHandler2)
125
126             repeat(5) {
127                 natsService.publish("multi-publish", "multi publish message-$it".toByteArray())
128             }
129         }
130     }
131
132     private fun testLoadBalance(natsService: BluePrintNatsService) {
133         runBlocking {
134             /** Load balance Publish Message Test **/
135             val lbMessageHandler1 =
136                 MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") }
137             val lbMessageHandler2 =
138                 MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
139
140             natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
141             natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
142
143             repeat(5) {
144                 natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
145             }
146         }
147     }
148
149     private fun testRequestReply(natsService: BluePrintNatsService) {
150         runBlocking {
151             val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
152                 println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
153                 message.connection.publish(
154                     message.replyTo,
155                     "Notification ${String(message.data)} reply from 1".toByteArray()
156                 )
157             }
158
159             val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
160                 println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})")
161                 message.connection.publish(
162                     message.replyTo,
163                     "Notification ${String(message.data)} reply from 2".toByteArray()
164                 )
165             }
166
167             natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
168             natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
169
170             repeat(5) {
171                 val message = natsService.requestAndGetOneReply(
172                     "rr-request",
173                     "rr message-$it".toByteArray(),
174                     1000
175                 )
176                 println("Received : ${message.strData()}")
177             }
178         }
179     }
180
181     private fun testMultiRequestReply(natsService: BluePrintNatsService) {
182         runBlocking {
183             /** Request Reply **/
184             val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
185                 println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
186                 message.connection.publish(
187                     message.replyTo,
188                     "Notification ${message.strData()} reply from 1".toByteArray()
189                 )
190                 message.connection.publish(
191                     message.replyTo,
192                     "Completion ${message.strData()} reply from 1".toByteArray()
193                 )
194             }
195             val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
196                 println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})")
197                 message.connection.publish(
198                     message.replyTo,
199                     "Notification ${message.strData()} reply from 2".toByteArray()
200                 )
201                 message.connection.publish(
202                     message.replyTo,
203                     "Completion ${message.strData()} reply from 2".toByteArray()
204                 )
205             }
206
207             natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
208             natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
209
210             /** Should unsubscribe on completion message */
211             val rrReplyMessageHandler = io.nats.client.MessageHandler { message ->
212                 val messageContent = message.strData()
213                 println("RR Reply Handler : $messageContent")
214                 if (messageContent.startsWith("Completion")) {
215                     message.subscription.unsubscribe()
216                 }
217             }
218             repeat(5) {
219                 natsService.requestAndGetMultipleReplies(
220                     "rr-request",
221                     "rr-reply-$it",
222                     "rr message-$it".toByteArray(),
223                     rrReplyMessageHandler
224                 )
225             }
226         }
227     }
228 }