2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
22 import io.nats.streaming.MessageHandler
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.runBlocking
26 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
27 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
28 import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
29 import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
30 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
31 import kotlin.test.assertNotNull
33 class BluePrintNatsServiceTest {
36 fun testTokenAuthNatService() {
37 val configuration = """{
38 "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}",
39 "host" : "nats://localhost:4222",
44 val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
46 val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
48 spkBluePrintNatsLibPropertyService
49 .bluePrintNatsService(any<NatsConnectionProperties>())
50 } returns TokenAuthNatsService(
54 val bluePrintNatsService =
55 spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
56 assertNotNull(bluePrintNatsService, "failed to get NATS Service")
60 fun testTLSAuthNatService() {
61 val configuration = """{
62 "type" : "${NatsLibConstants.TYPE_TLS_AUTH}",
63 "host" : "nats://localhost:4222"
67 val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
69 val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
71 spkBluePrintNatsLibPropertyService
72 .bluePrintNatsService(any<NatsConnectionProperties>())
73 } returns TLSAuthNatsService(
77 val bluePrintNatsService =
78 spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
79 assertNotNull(bluePrintNatsService, "failed to get NATS Service")
82 /** Enable to test only on local desktop. Don't enable in Build server
83 * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
86 fun localTntegrationTest() {
89 val connectionProperties = TokenAuthNatsConnectionProperties().apply {
90 host = "nats://localhost:4222,nats://localhost:4223"
94 val natsService = TokenAuthNatsService(connectionProperties)
95 val streamingConnection = natsService.connection()
96 assertNotNull(streamingConnection, "failed to create nats connection")
98 val connectionProperties2 = TokenAuthNatsConnectionProperties().apply {
99 host = "nats://localhost:4222,nats://localhost:4223"
100 clientId = "client-2"
103 val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2)
104 val streamingConnection2 = tlsAuthNatsService2.connection()
105 assertNotNull(streamingConnection2, "failed to create nats connection 2")
107 testMultiPublish(natsService)
108 testLoadBalance(natsService)
109 testRequestReply(natsService)
110 testMultiRequestReply(natsService)
115 private fun testMultiPublish(natsService: BluePrintNatsService) {
117 /** Multiple Publish Message Test **/
118 val messageHandler1 =
119 MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") }
120 val messageHandler2 =
121 MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") }
123 natsService.subscribe("multi-publish", messageHandler1)
124 natsService.subscribe("multi-publish", messageHandler2)
127 natsService.publish("multi-publish", "multi publish message-$it".toByteArray())
132 private fun testLoadBalance(natsService: BluePrintNatsService) {
134 /** Load balance Publish Message Test **/
135 val lbMessageHandler1 =
136 MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") }
137 val lbMessageHandler2 =
138 MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
140 natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
141 natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
144 natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
149 private fun testRequestReply(natsService: BluePrintNatsService) {
151 val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
152 println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
153 message.connection.publish(
155 "Notification ${String(message.data)} reply from 1".toByteArray()
159 val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
160 println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})")
161 message.connection.publish(
163 "Notification ${String(message.data)} reply from 2".toByteArray()
167 natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
168 natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
171 val message = natsService.requestAndGetOneReply(
173 "rr message-$it".toByteArray(),
176 println("Received : ${message.strData()}")
181 private fun testMultiRequestReply(natsService: BluePrintNatsService) {
183 /** Request Reply **/
184 val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
185 println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
186 message.connection.publish(
188 "Notification ${message.strData()} reply from 1".toByteArray()
190 message.connection.publish(
192 "Completion ${message.strData()} reply from 1".toByteArray()
195 val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
196 println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})")
197 message.connection.publish(
199 "Notification ${message.strData()} reply from 2".toByteArray()
201 message.connection.publish(
203 "Completion ${message.strData()} reply from 2".toByteArray()
207 natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
208 natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
210 /** Should unsubscribe on completion message */
211 val rrReplyMessageHandler = io.nats.client.MessageHandler { message ->
212 val messageContent = message.strData()
213 println("RR Reply Handler : $messageContent")
214 if (messageContent.startsWith("Completion")) {
215 message.subscription.unsubscribe()
219 natsService.requestAndGetMultipleReplies(
222 "rr message-$it".toByteArray(),
223 rrReplyMessageHandler