*******************************************************************************/
package org.onap.dmaap.mr.client;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
+import java.io.*;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.UUID;
-
import javax.ws.rs.core.MultivaluedMap;
-
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.impl.MRMetaClient;
import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
private static final String AUTH_DATE = "authDate";
private static final String PASSWORD = "password";
private static final String USERNAME = "username";
+ private static final String FILTER = "filter";
+ private static final String HOST = "host";
private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
private static final String TOPIC = "topic";
private static final String TRANSPORT_TYPE = "TransportType";
- public static MultivaluedMap<String, Object> HTTPHeadersMap;
+
+ private static MultivaluedMap<String, Object> httpHeadersMap;
public static Map<String, String> DME2HeadersMap;
public static String routeFilePath;
private MRClientFactory() {
//prevents instantiation.
}
-
+
+ /**
+ * Add getter to avoid direct access to static header map.
+ * @return
+ */
+ public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
+ return httpHeadersMap;
+ }
+
+ /**
+ * Add setter to avoid direct access to static header map.
+ * @param headers
+ */
+ public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
+ httpHeadersMap = headers;
+ }
+
/**
* Create a consumer instance with the default timeout and no limit on
* messages returned. This consumer operates as an independent consumer
*
* @param hostList
* A comma separated list of hosts to use to connect to MR. You
- * can include port numbers (3904 is the default). For example,
- * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+ * can include port numbers (3904 is the default)"
* @param topic
* The topic to consume
* @param consumerGroup
if (MRClientBuilders.sfConsumerMock != null)
return MRClientBuilders.sfConsumerMock;
try {
- return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
- apiSecret);
+ return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
+ .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
+ .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
+ .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
+ .createMRConsumerImpl();
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
* use gzip compression
* @param protocolFlag
* http auth or ueb auth or dme2 method
- * @param producerFilePath
- * all properties for publisher
* @return MRBatchingPublisher obj
*/
public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown
*
- * @param Properties
+ * @param props
* props set all properties for publishing message
* @return MRBatchingPublisher obj
* @throws FileNotFoundException
* Create a publisher that batches messages. Be sure to close the publisher
* to send the last batch and ensure a clean shutdown
*
- * @param Properties
+ * @param props
* props set all properties for publishing message
* @return MRBatchingPublisher obj
* @throws FileNotFoundException
*/
public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(producerFilePath));
Properties props = new Properties();
- props.load(reader);
+ try(InputStream input = new FileInputStream(producerFilePath)) {
+ props.load(input);
+ }
return createBatchingPublisher(props);
}
*/
public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(producerFilePath));
Properties props = new Properties();
- props.load(reader);
+ try(InputStream input = new FileInputStream(producerFilePath)) {
+ props.load(input);
+ }
return createBatchingPublisher(props, withResponse);
}
MRSimplerBatchPublisher pub;
if (withResponse) {
pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
.onTopic(props.getProperty(TOPIC))
.batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
Integer.parseInt(props.getProperty("maxAgeMs").toString()))
.withResponse(withResponse).build();
} else {
pub = new MRSimplerBatchPublisher.Builder()
- .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
.onTopic(props.getProperty(TOPIC))
.batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
Integer.parseInt(props.getProperty("maxAgeMs").toString()))
.compress(Boolean.parseBoolean(props.getProperty("compress")))
.httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
}
- pub.setHost(props.getProperty("host"));
+ pub.setHost(props.getProperty(HOST));
if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
pub.setAuthKey(props.getProperty(AUTH_KEY));
MRConsumerImpl sub;
try {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
+ sub = new MRConsumerImpl.MRConsumerImplBuilder()
+ .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
+ .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
+ .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
+ .createMRConsumerImpl();
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
MRConsumerImpl sub;
try {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
+ sub = new MRConsumerImpl.MRConsumerImplBuilder()
+ .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
+ .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
+ .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
+ .createMRConsumerImpl();
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
}
public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
- FileReader reader = new FileReader(new File(consumerFilePath));
Properties props = new Properties();
- props.load(reader);
-
+ try(InputStream input = new FileInputStream(consumerFilePath)) {
+ props.load(input);
+ }
return createConsumer(props);
}
group = props.getProperty("group");
MRConsumerImpl sub = null;
if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
- group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
- props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE));
+ sub = new MRConsumerImpl.MRConsumerImplBuilder()
+ .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
+ .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
+ .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
+ .setFilter(props.getProperty(FILTER))
+ .setApiKey_username(props.getProperty(AUTH_KEY))
+ .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
sub.setAuthKey(props.getProperty(AUTH_KEY));
sub.setAuthDate(props.getProperty(AUTH_DATE));
sub.setUsername(props.getProperty(USERNAME));
sub.setPassword(props.getProperty(PASSWORD));
} else {
- sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
- group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
- props.getProperty(USERNAME), props.getProperty(PASSWORD));
+ sub = new MRConsumerImpl.MRConsumerImplBuilder()
+ .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
+ .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
+ .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
+ .setFilter(props.getProperty(FILTER))
+ .setApiKey_username(props.getProperty(USERNAME))
+ .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
sub.setUsername(props.getProperty(USERNAME));
sub.setPassword(props.getProperty(PASSWORD));
}
sub.setProps(props);
- sub.setHost(props.getProperty("host"));
+ sub.setHost(props.getProperty(HOST));
sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
- sub.setfFilter(props.getProperty("filter"));
+ sub.setfFilter(props.getProperty(FILTER));
if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);