Fixing Blueprint Typo's and docs
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / nats-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / nats / service / BluePrintNatsService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 @file:Suppress("BlockingMethodInNonBlockingContext")
18
19 package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
20
21 import io.nats.client.Dispatcher
22 import io.nats.streaming.MessageHandler
23 import io.nats.streaming.StreamingConnection
24 import io.nats.streaming.Subscription
25 import io.nats.streaming.SubscriptionOptions
26 import java.time.Duration
27
28 interface BluePrintNatsService {
29
30     /** Create and Return the NATS streaming connection */
31     suspend fun connection(): StreamingConnection
32
33     /** Send one request [message] to the [subject] and get only one reply
34      * The request message subscriber may be multi instances consumer or load balance consumer.
35      * If it is multi instances consumer, then we will get only first responses from subscribers.
36      *
37      */
38     suspend fun requestAndGetOneReply(subject: String, message: ByteArray, timeout: Long): io.nats.client.Message {
39         return connection().natsConnection.request(subject, message, Duration.ofMillis(timeout))
40     }
41
42     /** Send one request [message] to the [subject] and get multiple replies in [replySubject] with [messageHandler]
43      * The request message subscriber may be multi instances consumer or load balance consumer.
44      * If it is multi instances consumer, then we will get multiple responses from subscribers.
45      * Include the unSubscribe logic's in [messageHandler] implementation.
46      */
47     suspend fun requestAndGetMultipleReplies(
48         subject: String,
49         replySubject: String,
50         message: ByteArray,
51         messageHandler: io.nats.client.MessageHandler
52     ) {
53         val natsConnection = connection().natsConnection
54         val dispatcher = natsConnection.createDispatcher(messageHandler)
55         /** Reply subject consumer */
56         dispatcher.subscribe(replySubject)
57
58         /** Publish the request message and expect the reply messages in reply subject consumer */
59         natsConnection.publish(subject, replySubject, message)
60     }
61
62     /** Synchronous reply Subscribe the [subject] with the [messageHandler].
63      * This is used only the message has to be consumed by all instances in the cluster and message handler must reply.
64      */
65     suspend fun replySubscribe(
66         subject: String,
67         messageHandler: io.nats.client.MessageHandler
68     ): Dispatcher {
69         val natsConnection = connection().natsConnection
70         val dispatcher = natsConnection.createDispatcher(messageHandler)
71         return dispatcher.subscribe(subject)
72     }
73
74     /**
75      * Synchronous reply Subscriber will listen for [subject] with [loadBalanceGroup].
76      * This is used only the message has to be consumed by only one instance in the cluster.
77      * server will now load balance messages between the members of the queue group and message handler must reply.
78      */
79     suspend fun loadBalanceReplySubscribe(
80         subject: String,
81         loadBalanceGroup: String,
82         messageHandler: io.nats.client.MessageHandler
83     ): Dispatcher {
84         val natsConnection = connection().natsConnection
85         val dispatcher = natsConnection.createDispatcher(messageHandler)
86         return dispatcher.subscribe(subject, loadBalanceGroup)
87     }
88
89     /** Publish the [message] to all subscribers on the [subject] */
90     suspend fun publish(subject: String, message: ByteArray) {
91         connection().publish(subject, message)
92     }
93
94     /** Subscribe the [subject] with the [messageHandler].
95      * This is used only the message has to be consumed by all instances in the cluster.
96      */
97     suspend fun subscribe(
98         subject: String,
99         messageHandler: MessageHandler
100     ): Subscription {
101         return connection().subscribe(subject, messageHandler)
102     }
103
104     /** Subscribe the [subject] with the [messageHandler] and [subscriptionOptions].
105      * This is used only the message has to be consumed by all instances in the cluster.
106      */
107     suspend fun subscribe(
108         subject: String,
109         messageHandler: MessageHandler,
110         subscriptionOptions: SubscriptionOptions
111     ): Subscription {
112         return connection().subscribe(subject, messageHandler, subscriptionOptions)
113     }
114
115     /**
116      * https://docs.nats.io/developing-with-nats/receiving/queues
117      * subscribers will listen for [subject] with [loadBalanceGroup].
118      * This is used only the message has to be consumed by only one instance in the cluster.
119      * server will now load balance messages between the members of the queue group.
120      */
121     suspend fun loadBalanceSubscribe(
122         subject: String,
123         loadBalanceGroup: String,
124         messageHandler: MessageHandler
125     ): Subscription {
126         return connection().subscribe(subject, loadBalanceGroup, messageHandler)
127     }
128
129     /**
130      * https://docs.nats.io/developing-with-nats/receiving/queues
131      * subscribers will listen for [subject] with [loadBalanceGroup] and [subscriptionOptions].
132      * This is used only the message has to be consumed by only one instance in the cluster.
133      * server will now load balance messages between the members of the queue group.
134      */
135     suspend fun loadBalanceSubscribe(
136         subject: String,
137         loadBalanceGroup: String,
138         messageHandler: MessageHandler,
139         subscriptionOptions: SubscriptionOptions
140     ): Subscription {
141         return connection().subscribe(subject, loadBalanceGroup, messageHandler, subscriptionOptions)
142     }
143 }