2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 @file:Suppress("BlockingMethodInNonBlockingContext")
19 package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
21 import io.nats.client.Dispatcher
22 import io.nats.streaming.MessageHandler
23 import io.nats.streaming.StreamingConnection
24 import io.nats.streaming.Subscription
25 import io.nats.streaming.SubscriptionOptions
26 import java.time.Duration
28 interface BluePrintNatsService {
30 /** Create and Return the NATS streaming connection */
31 suspend fun connection(): StreamingConnection
33 /** Send one request [message] to the [subject] and get only one reply
34 * The request message subscriber may be multi instances consumer or load balance consumer.
35 * If it is multi instances consumer, then we will get only first responses from subscribers.
38 suspend fun requestAndGetOneReply(subject: String, message: ByteArray, timeout: Long): io.nats.client.Message {
39 return connection().natsConnection.request(subject, message, Duration.ofMillis(timeout))
42 /** Send one request [message] to the [subject] and get multiple replies in [replySubject] with [messageHandler]
43 * The request message subscriber may be multi instances consumer or load balance consumer.
44 * If it is multi instances consumer, then we will get multiple responses from subscribers.
45 * Include the unSubscribe logic's in [messageHandler] implementation.
47 suspend fun requestAndGetMultipleReplies(
51 messageHandler: io.nats.client.MessageHandler
53 val natsConnection = connection().natsConnection
54 val dispatcher = natsConnection.createDispatcher(messageHandler)
55 /** Reply subject consumer */
56 dispatcher.subscribe(replySubject)
58 /** Publish the request message and expect the reply messages in reply subject consumer */
59 natsConnection.publish(subject, replySubject, message)
62 /** Synchronous reply Subscribe the [subject] with the [messageHandler].
63 * This is used only the message has to be consumed by all instances in the cluster and message handler must reply.
65 suspend fun replySubscribe(
67 messageHandler: io.nats.client.MessageHandler
69 val natsConnection = connection().natsConnection
70 val dispatcher = natsConnection.createDispatcher(messageHandler)
71 return dispatcher.subscribe(subject)
75 * Synchronous reply Subscriber will listen for [subject] with [loadBalanceGroup].
76 * This is used only the message has to be consumed by only one instance in the cluster.
77 * server will now load balance messages between the members of the queue group and message handler must reply.
79 suspend fun loadBalanceReplySubscribe(
81 loadBalanceGroup: String,
82 messageHandler: io.nats.client.MessageHandler
84 val natsConnection = connection().natsConnection
85 val dispatcher = natsConnection.createDispatcher(messageHandler)
86 return dispatcher.subscribe(subject, loadBalanceGroup)
89 /** Publish the [message] to all subscribers on the [subject] */
90 suspend fun publish(subject: String, message: ByteArray) {
91 connection().publish(subject, message)
94 /** Subscribe the [subject] with the [messageHandler].
95 * This is used only the message has to be consumed by all instances in the cluster.
97 suspend fun subscribe(
99 messageHandler: MessageHandler
101 return connection().subscribe(subject, messageHandler)
104 /** Subscribe the [subject] with the [messageHandler] and [subscriptionOptions].
105 * This is used only the message has to be consumed by all instances in the cluster.
107 suspend fun subscribe(
109 messageHandler: MessageHandler,
110 subscriptionOptions: SubscriptionOptions
112 return connection().subscribe(subject, messageHandler, subscriptionOptions)
116 * https://docs.nats.io/developing-with-nats/receiving/queues
117 * subscribers will listen for [subject] with [loadBalanceGroup].
118 * This is used only the message has to be consumed by only one instance in the cluster.
119 * server will now load balance messages between the members of the queue group.
121 suspend fun loadBalanceSubscribe(
123 loadBalanceGroup: String,
124 messageHandler: MessageHandler
126 return connection().subscribe(subject, loadBalanceGroup, messageHandler)
130 * https://docs.nats.io/developing-with-nats/receiving/queues
131 * subscribers will listen for [subject] with [loadBalanceGroup] and [subscriptionOptions].
132 * This is used only the message has to be consumed by only one instance in the cluster.
133 * server will now load balance messages between the members of the queue group.
135 suspend fun loadBalanceSubscribe(
137 loadBalanceGroup: String,
138 messageHandler: MessageHandler,
139 subscriptionOptions: SubscriptionOptions
141 return connection().subscribe(subject, loadBalanceGroup, messageHandler, subscriptionOptions)