* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
* Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
try {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- messages.add(record.value());
+ for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
+ messages.add(partitionRecord.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
* Modifications Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
public interface BusPublisher {
+ public static final String NO_MESSAGE_PROVIDED = "No message provided";
+
/**
* sends a message.
*
}
}
-
if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
}
@Override
public boolean send(String partitionId, String message) {
if (message == null) {
- throw new IllegalArgumentException("No message provided");
+ throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
}
try {
}
}
-
@Override
public String toString() {
return "CambriaPublisherWrapper []";
this.topic = busTopicParams.getTopic();
- //Setup Properties for consumer
+ // Setup Properties for consumer
kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
if (busTopicParams.isAdditionalPropsValid()) {
@Override
public boolean send(String partitionId, String message) {
if (message == null) {
- throw new IllegalArgumentException("No message provided");
+ throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
}
try {
- //Create the record
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
- UUID.randomUUID().toString(), message);
+ // Create the record
+ ProducerRecord<String, String> producerRecord =
+ new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
- this.producer.send(record);
+ this.producer.send(producerRecord);
producer.flush();
} catch (Exception e) {
logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
}
}
-
@Override
public String toString() {
return "KafkaPublisherWrapper []";
protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
String username, String password, boolean useHttps) {
-
if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("No topic for DMaaP");
}
-
configureProtocol(topic, protocol, servers, useHttps);
this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
}
private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
- boolean useHttps) {
+ boolean useHttps) {
if (protocol == ProtocolTypeConstants.AAF_AUTH) {
if (servers == null || servers.isEmpty()) {
dmaapServers.add(server + port);
}
-
this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
@Override
public boolean send(String partitionId, String message) {
if (message == null) {
- throw new IllegalArgumentException("No message provided");
+ throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
}
this.publisher.setPubResponse(new MRPublisherResponse());
busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null;
+ ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
+ : null;
validateParams(busTopicParams, dme2RouteOffer);
PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
}
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
+ throw new IllegalArgumentException("Must provide at least "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
}
}
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Bell Canada. All rights reserved.
+ * Copyright (C) 2022-2023 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
protected final void writeInternal(Object object, @Nullable Type type, HttpOutputMessage outputMessage)
throws IOException {
try (var writer = getWriter(outputMessage)) {
- writeInternal(object, type, writer);
+ writeInternal(object, writer);
writer.flush();
} catch (Exception ex) {
throw new HttpMessageNotWritableException("Could not write YAML: " + ex.getMessage(), ex);
}
}
- private void writeInternal(Object object, @Nullable Type type, Writer writer) {
+ private void writeInternal(Object object, Writer writer) {
TRANSLATOR.toYaml(writer, object);
}
}
private static Charset getCharset(HttpHeaders headers) {
- Charset charset = (headers.getContentType() == null ? null : headers.getContentType().getCharset());
+ MediaType contentType = headers.getContentType();
+ Charset charset = (contentType == null ? null : contentType.getCharset());
return (charset != null ? charset : DEFAULT_CHARSET);
}
-}
\ No newline at end of file
+}