55f24a652bd84238f37de995e94f5c325b95838f
[dcaegen2/services.git] /
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  slice-analysis-ms
4  *  ================================================================================
5  *   Copyright (C) 2020-2021 Wipro Limited.
6  *   Copyright (C) 2022 CTC, Inc.
7  *   ==============================================================================
8  *     Licensed under the Apache License, Version 2.0 (the "License");
9  *     you may not use this file except in compliance with the License.
10  *     You may obtain a copy of the License at
11  *
12  *          http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *     Unless required by applicable law or agreed to in writing, software
15  *     distributed under the License is distributed on an "AS IS" BASIS,
16  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *     See the License for the specific language governing permissions and
18  *     limitations under the License.
19  *     ============LICENSE_END=========================================================
20  *
21  *******************************************************************************/
22
23 package org.onap.slice.analysis.ms.dmaap;
24
25 import com.google.gson.JsonPrimitive;
26 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
27 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
28 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
29 import reactor.core.publisher.Flux;
30
31 import java.io.IOException;
32
33 /**
34  * Produces Notification on DMAAP events
35  */
36 public class NotificationProducer {
37
38     private MessageRouterPublisher publisher;
39     private MessageRouterPublishRequest request;
40      
41     /**
42      * Parameterized constructor.
43      */
44     public NotificationProducer(MessageRouterPublisher publisher, MessageRouterPublishRequest request) {
45         super();
46         this.publisher = publisher;
47         this.request = request;
48     }
49
50     /**
51      * sends notification to dmaap.
52      */
53     public void sendNotification(String msg) throws IOException {
54         Flux<JsonPrimitive> singleMessage = Flux.just(msg).map(JsonPrimitive::new);
55         Flux<MessageRouterPublishResponse> result = this.publisher.put(request, singleMessage);
56         result.then().block();
57     }
58
59 }