2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2021 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.common.publishing;
22 import com.google.gson.JsonObject;
23 import io.vavr.collection.List;
24 import io.vavr.control.Option;
25 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
26 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
27 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
28 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
29 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
30 import reactor.core.publisher.Flux;
32 import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration;
33 import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
34 import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch;
36 public class Publisher {
38 private final MessageRouterPublisher publisher;
41 this(retryConfiguration());
44 public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) {
45 publisher = DmaapClientFactory
46 .createMessageRouterPublisher(messageRouterPublisherConfig);
52 * @param events list of ves events prepared to send
53 * @param publisherConfig publisher configuration
54 * @return flux containing information about the success or failure of the event publication
56 public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) {
57 return publishEvents(events, createPublishRequest(publisherConfig));
60 Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) {
61 final Flux<JsonObject> jsonMessageBatch = jsonBatch(events);
62 return publisher.put(publishRequest, jsonMessageBatch);