Fix Sent Wrong Dmaap Message Issue
[dcaegen2/services.git] / components / slice-analysis-ms / src / main / java / org / onap / slice / analysis / ms / dmaap / NotificationProducer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  slice-analysis-ms
4  *  ================================================================================
5  *   Copyright (C) 2020-2021 Wipro Limited.
6  *   Copyright (C) 2022 CTC, Inc.
7  *   ==============================================================================
8  *     Licensed under the Apache License, Version 2.0 (the "License");
9  *     you may not use this file except in compliance with the License.
10  *     You may obtain a copy of the License at
11  *
12  *          http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *     Unless required by applicable law or agreed to in writing, software
15  *     distributed under the License is distributed on an "AS IS" BASIS,
16  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *     See the License for the specific language governing permissions and
18  *     limitations under the License.
19  *     ============LICENSE_END=========================================================
20  *
21  *******************************************************************************/
22
23 package org.onap.slice.analysis.ms.dmaap;
24
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonParser;
27 import java.io.IOException;
28 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
29 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
31 import reactor.core.publisher.Flux;
32
33 /**
34  * Produces Notification on DMAAP events
35  */
36 public class NotificationProducer {
37
38     private MessageRouterPublisher publisher;
39     private MessageRouterPublishRequest request;
40      
41     /**
42      * Parameterized constructor.
43      */
44     public NotificationProducer(MessageRouterPublisher publisher, MessageRouterPublishRequest request) {
45         super();
46         this.publisher = publisher;
47         this.request = request;
48     }
49
50     /**
51      * sends notification to dmaap.
52      */
53     public void sendNotification(String msg) throws IOException {
54         JsonElement jsonElement = JsonParser.parseString(msg);
55         Flux<JsonElement> singleMessage = Flux.just(jsonElement);
56         Flux<MessageRouterPublishResponse> result = this.publisher.put(request, singleMessage);
57         result.then().block();
58     }
59
60 }