2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
22 import io.nats.streaming.MessageHandler
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.runBlocking
26 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
27 import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
28 import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
29 import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
30 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
32 import kotlin.test.assertNotNull
34 class BluePrintNatsServiceTest {
37 fun testTokenAuthNatService() {
38 val configuration = """{
39 "type" : "${NatsLibConstants.TYPE_TOKEN_AUTH}",
40 "host" : "nats://localhost:4222",
45 val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
47 val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
49 spkBluePrintNatsLibPropertyService
50 .bluePrintNatsService(any<NatsConnectionProperties>())
51 } returns TokenAuthNatsService(
55 val bluePrintNatsService =
56 spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
57 assertNotNull(bluePrintNatsService, "failed to get NATS Service")
61 fun testTLSAuthNatService() {
62 val configuration = """{
63 "type" : "${NatsLibConstants.TYPE_TLS_AUTH}",
64 "host" : "nats://localhost:4222"
68 val bluePrintNatsLibPropertyService = BluePrintNatsLibPropertyService(mockk())
70 val spkBluePrintNatsLibPropertyService = spyk(bluePrintNatsLibPropertyService)
72 spkBluePrintNatsLibPropertyService
73 .bluePrintNatsService(any<NatsConnectionProperties>())
74 } returns TLSAuthNatsService(
78 val bluePrintNatsService =
79 spkBluePrintNatsLibPropertyService.bluePrintNatsService(configuration.jsonAsJsonType())
80 assertNotNull(bluePrintNatsService, "failed to get NATS Service")
83 /** Enable to test only on local desktop. Don't enable in Build server
84 * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
87 fun localIntegrationTest() {
90 val connectionProperties = TokenAuthNatsConnectionProperties().apply {
91 host = "nats://localhost:4222,nats://localhost:4223"
95 val natsService = TokenAuthNatsService(connectionProperties)
96 val streamingConnection = natsService.connection()
97 assertNotNull(streamingConnection, "failed to create nats connection")
99 val connectionProperties2 = TokenAuthNatsConnectionProperties().apply {
100 host = "nats://localhost:4222,nats://localhost:4223"
101 clientId = "client-2"
104 val tlsAuthNatsService2 = TokenAuthNatsService(connectionProperties2)
105 val streamingConnection2 = tlsAuthNatsService2.connection()
106 assertNotNull(streamingConnection2, "failed to create nats connection 2")
108 testMultiPublish(natsService)
109 testLoadBalance(natsService)
110 testLimitSubscription(natsService)
111 testRequestReply(natsService)
112 testMultiRequestReply(natsService)
117 private fun testMultiPublish(natsService: BluePrintNatsService) {
119 /** Multiple Publish Message Test **/
120 val messageHandler1 =
121 MessageHandler { message -> println("Multi Publish Message Handler 1: ${message.strData()}") }
122 val messageHandler2 =
123 MessageHandler { message -> println("Multi Publish Message Handler 2: ${message.strData()}") }
125 natsService.subscribe("multi-publish", messageHandler1)
126 natsService.subscribe("multi-publish", messageHandler2)
129 natsService.publish("multi-publish", "multi publish message-$it".toByteArray())
134 private fun testLoadBalance(natsService: BluePrintNatsService) {
136 /** Load balance Publish Message Test **/
137 val lbMessageHandler1 =
138 MessageHandler { message -> println("LB Publish Message Handler 1: ${message.strData()}") }
139 val lbMessageHandler2 =
140 MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
142 val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
143 val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
146 natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
153 private fun testLimitSubscription(natsService: BluePrintNatsService) {
155 /** Load balance Publish Message Test **/
156 val lbMessageHandler1 =
157 MessageHandler { message ->
159 println("LB Publish Message Handler 1: ${message.strData()}")
163 val lbMessageHandler2 =
164 MessageHandler { message ->
166 println("LB Publish Message Handler 2: ${message.strData()}")
171 val sub1 = natsService.loadBalanceSubscribe(
172 "lb-publish", "lb-group", lbMessageHandler1,
173 SubscriptionOptionsUtils.manualAckWithRateLimit(1)
175 val sub2 = natsService.loadBalanceSubscribe(
176 "lb-publish", "lb-group", lbMessageHandler2,
177 SubscriptionOptionsUtils.manualAckWithRateLimit(1)
181 natsService.publish("lb-publish", "lb limit message-$it".toByteArray())
188 private fun testRequestReply(natsService: BluePrintNatsService) {
190 val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
191 println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
192 message.connection.publish(
194 "Notification ${String(message.data)} reply from 1".toByteArray()
198 val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
199 println("LB RR Request Handler 2: ${String(message.data)} will reply to(${message.replyTo})")
200 message.connection.publish(
202 "Notification ${String(message.data)} reply from 2".toByteArray()
206 natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
207 natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
210 val message = natsService.requestAndGetOneReply(
212 "rr message-$it".toByteArray(),
215 println("Received : ${message.strData()}")
220 private fun testMultiRequestReply(natsService: BluePrintNatsService) {
222 /** Request Reply **/
223 val lbMessageHandler1 = io.nats.client.MessageHandler { message ->
224 println("LB RR Request Handler 1: ${String(message.data)} will reply to(${message.replyTo})")
225 message.connection.publish(
227 "Notification ${message.strData()} reply from 1".toByteArray()
229 message.connection.publish(
231 "Completion ${message.strData()} reply from 1".toByteArray()
234 val lbMessageHandler2 = io.nats.client.MessageHandler { message ->
235 println("LB RR Request Handler 2: ${message.strData()} will reply to(${message.replyTo})")
236 message.connection.publish(
238 "Notification ${message.strData()} reply from 2".toByteArray()
240 message.connection.publish(
242 "Completion ${message.strData()} reply from 2".toByteArray()
246 natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler1)
247 natsService.loadBalanceReplySubscribe("rr-request", "rr-group", lbMessageHandler2)
249 /** Should unsubscribe on completion message */
250 val rrReplyMessageHandler = io.nats.client.MessageHandler { message ->
251 val messageContent = message.strData()
252 println("RR Reply Handler : $messageContent")
253 if (messageContent.startsWith("Completion")) {
254 message.subscription.unsubscribe()
258 natsService.requestAndGetMultipleReplies(
261 "rr message-$it".toByteArray(),
262 rrReplyMessageHandler