X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fmr%2Fclient%2FMRClientBuilders.java;fp=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fmr%2Fclient%2FMRClientBuilders.java;h=e8e700397e8f4b91c7f26ac7d5e6dae8cea10858;hb=72a9ab9e886cdeabc4b43418a7054a5796a0ff55;hp=91e10e03d19e7cf7b68af3bca1b263beb58515d2;hpb=78ebc9a64fac6231e3e594200b9335a4c6372ed1;p=dmaap%2Fmessagerouter%2Fdmaapclient.git diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java index 91e10e0..e8e7003 100644 --- a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java +++ b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java @@ -5,12 +5,13 @@ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Modifications Copyright © 2018 IBM. + * Modifications Copyright © 2021 Orange. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,30 +20,31 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client; import java.net.MalformedURLException; import java.util.Collection; +import java.util.Collections; import java.util.TreeSet; import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRMetaClient; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * A collection of builders for various types of MR API clients - * + * A collection of builders for various types of MR API clients. + * * @author author */ -public class MRClientBuilders -{ - private final static String ILLEGAL_ARGUMENT_MESSAGE = "You must provide at least one host and a topic name."; +public class MRClientBuilders { + private static final Logger logger = LoggerFactory.getLogger(MRClientBuilders.class); + + private static final String ILLEGAL_ARGUMENT_MESSAGE = "You must provide at least one host and a topic name."; /** * Instantiates MRClientBuilders. @@ -50,115 +52,124 @@ public class MRClientBuilders private MRClientBuilders() { // prevent instantiation } - + /** - * A builder for a topic Consumer + * A builder for a topic Consumer. + * * @author author */ - public static class ConsumerBuilder - { + public static class ConsumerBuilder { /** - * Set the host list + * Set the host list. + * * @param hostList a comma-separated list of hosts to use to connect to MR * @return this builder */ - public ConsumerBuilder usingHosts ( String hostList ) { - return usingHosts ( MRConsumerImpl.stringToList(hostList) ); + public ConsumerBuilder usingHosts(String hostList) { + return usingHosts(MRConsumerImpl.stringToList(hostList)); } /** - * Set the host list + * Set the host list. + * * @param hostSet a set of hosts to use to connect to MR * @return this builder */ - public ConsumerBuilder usingHosts ( Collection hostSet ) { - fHosts = hostSet; return this; + public ConsumerBuilder usingHosts(Collection hostSet) { + fHosts = hostSet; + return this; } /** - * Set the topic + * Set the topic. + * * @param topic the name of the topic to consume * @return this builder */ - public ConsumerBuilder onTopic ( String topic ) { - fTopic=topic; - return this; + public ConsumerBuilder onTopic(String topic) { + fTopic = topic; + return this; } /** - * Set the consumer's group and ID + * Set the consumer's group and ID. + * * @param consumerGroup The name of the consumer group this consumer is part of - * @param consumerId The unique id of this consumer in its group + * @param consumerId The unique id of this consumer in its group * @return this builder */ - public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { - fGroup = consumerGroup; - fId = consumerId; - return this; + public ConsumerBuilder knownAs(String consumerGroup, String consumerId) { + fGroup = consumerGroup; + fId = consumerId; + return this; } /** * Set the API key and secret for this client. + * * @param apiKey * @param apiSecret * @return this builder */ - public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { - fApiKey = apiKey; - fApiSecret = apiSecret; - return this; + public ConsumerBuilder authenticatedBy(String apiKey, String apiSecret) { + fApiKey = apiKey; + fApiSecret = apiSecret; + return this; } /** - * Set the server side timeout - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. + * Set the server side timeout. + * + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. * @return this builder */ - public ConsumerBuilder waitAtServer ( int timeoutMs ) { - fTimeoutMs = timeoutMs; - return this; - }; + public ConsumerBuilder waitAtServer(int timeoutMs) { + fTimeoutMs = timeoutMs; + return this; + } /** - * Set the maximum number of messages to receive per transaction + * Set the maximum number of messages to receive per transaction. + * * @param limit The maximum number of messages to receive from the server in one transaction. * @return this builder */ - public ConsumerBuilder receivingAtMost ( int limit ) { - fLimit = limit; - return this; - }; + public ConsumerBuilder receivingAtMost(int limit) { + fLimit = limit; + return this; + } /** - * Set a filter to use on the server + * Set a filter to use on the server. + * * @param filter a Highland Park standard library filter encoded in JSON * @return this builder */ - public ConsumerBuilder withServerSideFilter ( String filter ) { - fFilter = filter; - return this; + public ConsumerBuilder withServerSideFilter(String filter) { + fFilter = filter; + return this; } /** - * Build the consumer + * Build the consumer. + * * @return a consumer */ - public MRConsumer build () - { - if ( fHosts == null || fHosts.isEmpty() || fTopic == null ) - { - throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE ); + public MRConsumer build() { + if (fHosts == null || fHosts.isEmpty() || fTopic == null) { + throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE); } - if ( fGroup == null ) - { - fGroup = UUID.randomUUID ().toString (); + if (fGroup == null) { + fGroup = UUID.randomUUID().toString(); fId = "0"; - log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." ); + logger.info("Creating non-restartable client with group {} and ID {}.", fGroup, fId); } - if ( sfConsumerMock != null ) return sfConsumerMock; + if (sfConsumerMock != null) { + return sfConsumerMock; + } try { return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(fHosts) .setTopic(fTopic).setConsumerGroup(fGroup).setConsumerId(fId) @@ -181,137 +192,142 @@ public class MRClientBuilders private String fFilter = null; } - /*************************************************************************/ - /*************************************************************************/ - /*************************************************************************/ + //*************************************************************************/ + //*************************************************************************/ + //*************************************************************************/ /** - * A publisher builder + * A publisher builder. + * * @author author */ - public static class PublisherBuilder - { + public static class PublisherBuilder { /** - * Set the MR/UEB host(s) to use + * Set the MR/UEB host(s) to use. + * * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries. * @return this builder */ - public PublisherBuilder usingHosts ( String hostlist ) { - return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); + public PublisherBuilder usingHosts(String hostlist) { + return usingHosts(MRConsumerImpl.stringToList(hostlist)); } /** - * Set the MR/UEB host(s) to use + * Set the MR/UEB host(s) to use. + * * @param hostSet The host(s) used in the URL to MR. Can be "host:port" * @return this builder */ - public PublisherBuilder usingHosts ( String[] hostSet ) - { - final TreeSet hosts = new TreeSet<> (); - for ( String hp : hostSet ) - { - hosts.add ( hp ); - } - return usingHosts ( hosts ); + public PublisherBuilder usingHosts(String[] hostSet) { + final TreeSet hosts = new TreeSet<>(); + Collections.addAll(hosts, hostSet); + return usingHosts(hosts); } /** - * Set the MR/UEB host(s) to use + * Set the MR/UEB host(s) to use. + * * @param hostlist The host(s) used in the URL to MR. Can be "host:port". * @return this builder */ - public PublisherBuilder usingHosts ( Collection hostlist ) { - fHosts=hostlist; - return this; + public PublisherBuilder usingHosts(Collection hostlist) { + fHosts = hostlist; + return this; } /** - * Set the topic to publish on + * Set the topic to publish on. + * * @param topic The topic on which to publish messages. * @return this builder */ - public PublisherBuilder onTopic ( String topic ) { - fTopic = topic; - return this; + public PublisherBuilder onTopic(String topic) { + fTopic = topic; + return this; } /** * Batch message sends with the given limits. + * * @param messageCount The largest set of messages to batch. - * @param ageInMs The maximum age of a message waiting in a batch. + * @param ageInMs The maximum age of a message waiting in a batch. * @return this builder */ - public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { - fMaxBatchSize = messageCount; - fMaxBatchAgeMs = ageInMs; - return this; + public PublisherBuilder limitBatch(int messageCount, int ageInMs) { + fMaxBatchSize = messageCount; + fMaxBatchAgeMs = ageInMs; + return this; } /** - * Compress transactions + * Compress transactions. + * * @return this builder */ - public PublisherBuilder withCompresion () { - return enableCompresion(true); + public PublisherBuilder withCompresion() { + return enableCompresion(true); } /** - * Do not compress transactions + * Do not compress transactions. + * * @return this builder */ - public PublisherBuilder withoutCompresion () { - return enableCompresion(false); + public PublisherBuilder withoutCompresion() { + return enableCompresion(false); } /** - * Set the compression option + * Set the compression option. + * * @param compress true to gzip compress transactions * @return this builder */ - public PublisherBuilder enableCompresion ( boolean compress ) { - fCompress = compress; - return this; + public PublisherBuilder enableCompresion(boolean compress) { + fCompress = compress; + return this; } /** * Set the API key and secret for this client. + * * @param apiKey * @param apiSecret * @return this builder */ - public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { - fApiKey = apiKey; - fApiSecret = apiSecret; - return this; + public PublisherBuilder authenticatedBy(String apiKey, String apiSecret) { + fApiKey = apiKey; + fApiSecret = apiSecret; + return this; } /** - * Build the publisher + * Build the publisher. + * * @return a batching publisher */ - public MRBatchingPublisher build () - { - if ( fHosts == null || fHosts.isEmpty() || fTopic == null ) - { - throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE ); + public MRBatchingPublisher build() { + if (fHosts == null || fHosts.isEmpty() || fTopic == null) { + throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE); } - if ( sfPublisherMock != null ) return sfPublisherMock; - - final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder (). - againstUrls ( fHosts ). - onTopic ( fTopic ). - batchTo ( fMaxBatchSize, fMaxBatchAgeMs ). - compress ( fCompress ). - build (); - if ( fApiKey != null ) - { - pub.setApiCredentials ( fApiKey, fApiSecret ); + if (sfPublisherMock != null) { + return sfPublisherMock; + } + + final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder() + .againstUrls(fHosts) + .onTopic(fTopic) + .batchTo(fMaxBatchSize, fMaxBatchAgeMs) + .compress(fCompress) + .build(); + if (fApiKey != null) { + pub.setApiCredentials(fApiKey, fApiSecret); } return pub; } - + private Collection fHosts = null; private String fTopic = null; private int fMaxBatchSize = 1; @@ -322,50 +338,54 @@ public class MRClientBuilders } /** - * A builder for an identity manager + * A builder for an identity manager. + * * @author author */ - public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder - { + public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder { @Override - protected MRIdentityManager constructClient ( Collection hosts ) { try { - return new MRMetaClient ( hosts ); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } } + protected MRIdentityManager constructClient(Collection hosts) { + try { + return new MRMetaClient(hosts); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } } /** - * A builder for a topic manager + * A builder for a topic manager. + * * @author author */ - public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder - { + public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder { @Override - protected MRTopicManager constructClient ( Collection hosts ) { try { - return new MRMetaClient ( hosts ); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } } + protected MRTopicManager constructClient(Collection hosts) { + try { + return new MRMetaClient(hosts); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } } /** * Inject a consumer. Used to support unit tests. + * * @param cc */ - public static void $testInject ( MRConsumer cc ) - { + public static void $testInject(MRConsumer cc) { sfConsumerMock = cc; } /** * Inject a publisher. Used to support unit tests. + * * @param pub */ - public static void $testInject ( MRBatchingPublisher pub ) - { + public static void $testInject(MRBatchingPublisher pub) { sfPublisherMock = pub; } @@ -373,69 +393,66 @@ public class MRClientBuilders static MRBatchingPublisher sfPublisherMock = null; /** - * A builder for an identity manager + * A builder for an identity manager. + * * @author author */ - public static abstract class AbstractAuthenticatedManagerBuilder - { - /** - * Construct an identity manager builder. - */ - public AbstractAuthenticatedManagerBuilder () {} + public abstract static class AbstractAuthenticatedManagerBuilder { /** - * Set the host list + * Set the host list. + * * @param hostList a comma-separated list of hosts to use to connect to MR * @return this builder */ - public AbstractAuthenticatedManagerBuilder usingHosts ( String hostList ) { - return usingHosts ( MRConsumerImpl.stringToList(hostList) ); + public AbstractAuthenticatedManagerBuilder usingHosts(String hostList) { + return usingHosts(MRConsumerImpl.stringToList(hostList)); } /** - * Set the host list + * Set the host list. + * * @param hostSet a set of hosts to use to connect to MR * @return this builder */ - public AbstractAuthenticatedManagerBuilder usingHosts ( Collection hostSet ) { - fHosts = hostSet; - return this; + public AbstractAuthenticatedManagerBuilder usingHosts(Collection hostSet) { + fHosts = hostSet; + return this; } /** * Set the API key and secret for this client. + * * @param apiKey * @param apiSecret * @return this builder */ - public AbstractAuthenticatedManagerBuilder authenticatedBy ( String apiKey, String apiSecret ) { - fApiKey = apiKey; - fApiSecret = apiSecret; - return this; + public AbstractAuthenticatedManagerBuilder authenticatedBy(String apiKey, String apiSecret) { + fApiKey = apiKey; + fApiSecret = apiSecret; + return this; } /** - * Build the consumer + * Build the consumer. + * * @return a consumer */ - public T build () - { - if ( fHosts.isEmpty() ) - { - throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE ); + public T build() { + if (fHosts.isEmpty()) { + throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE); } - final T mgr = constructClient ( fHosts ); - mgr.setApiCredentials ( fApiKey, fApiSecret ); + final T mgr = constructClient(fHosts); + mgr.setApiCredentials(fApiKey, fApiSecret); return mgr; } - protected abstract T constructClient ( Collection hosts ); + protected abstract T constructClient(Collection hosts); private Collection fHosts = null; private String fApiKey = null; private String fApiSecret = null; } - - private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class ); + }