public class AAIClientImpl implements AAIClient {
- private Logger logger = LoggerFactory.getLogger(AAIClientImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private AAIClientConfiguration aaiClientConfig;
try {
logger.info("Setting SSL Context for AAI HTTP Client");
httpClientBuilder.setSSLContext(new SSLContextBuilder()
- .loadTrustMaterial(null, acceptingTrustStrategy)
- .build());
+ .loadTrustMaterial(null, acceptingTrustStrategy)
+ .build());
- } catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e ) {
+ } catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
logger.error("Exception while setting SSL Context for AAI HTTP Client: {}", e);
}
public class AAIConsumerClient {
- private Logger logger = LoggerFactory.getLogger(AAIConsumerClient.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final CloseableHttpClient closeableHttpClient;
private final String aaiHost;
package org.onap.dcaegen2.services.prh.service;
+import java.util.function.Predicate;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPatch;
public class AAIProducerClient implements AAIExtendedHttpClient {
private static final String EXCEPTION_MESSAGE = "Exception while executing http client: ";
- private Logger logger = LoggerFactory.getLogger(AAIProducerClient.class);
-
+ private static Predicate<String> isEmpty = String::isEmpty;
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final CloseableHttpClient closeableHttpClient;
private final String aaiHost;
private final String aaiProtocol;
private final Integer aaiHostPortNumber;
private final String aaiPath;
- private final Map<String,String> aaiHeaders;
+ private final Map<String, String> aaiHeaders;
private final String aaiUserName;
private final String aaiUserPassword;
}
Optional<HttpRequestBase> createHttpRequest(URI extendedURI, ConsumerDmaapModel consumerDmaapModel) {
- return Optional.ofNullable(CommonFunctions.createJsonBody(consumerDmaapModel)).filter(x-> !x.isEmpty()).flatMap(myJson -> {
- try {
- return Optional.of(createHttpPatch(extendedURI, myJson));
- } catch (UnsupportedEncodingException e) {
- logger.warn(EXCEPTION_MESSAGE, e);
- }
- return Optional.empty();
- });
+ return Optional.ofNullable(CommonFunctions.createJsonBody(consumerDmaapModel)).filter(isEmpty.negate())
+ .flatMap(myJson -> {
+ try {
+ logger.info("AAI: sending json {}", myJson);
+ return Optional.of(createHttpPatch(extendedURI, myJson));
+ } catch (UnsupportedEncodingException e) {
+ logger.warn(EXCEPTION_MESSAGE, e);
+ }
+ return Optional.empty();
+ });
}
HttpPatch createHttpPatch(URI extendedURI, String jsonBody) throws UnsupportedEncodingException {
HttpPatch httpPatch = new HttpPatch(extendedURI);
- httpPatch.setEntity( new StringEntity(jsonBody));
+ httpPatch.setEntity(new StringEntity(jsonBody));
aaiHeaders.forEach(httpPatch::addHeader);
httpPatch.addHeader("Content-Type", "application/merge-patch+json");
httpPatch.addHeader("Authorization", "Basic " + encode());
String encode() throws UnsupportedEncodingException {
return Base64.getEncoder().encodeToString((this.aaiUserName + ":" + this.aaiUserPassword)
- .getBytes("UTF-8"));
+ .getBytes("UTF-8"));
}
Optional<Integer> handleResponse(HttpResponse response) throws IOException {
*/
package org.onap.dcaegen2.services.prh.configuration;
+import java.util.function.Predicate;
import org.onap.dcaegen2.services.prh.config.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AppConfig extends PrhAppConfig {
+ private static Predicate<String> isEmpty = String::isEmpty;
@Value("${dmaap.dmaapConsumerConfiguration.dmaapHostName:}")
public String consumerDmaapHostName;
public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return new ImmutableDmaapConsumerConfiguration.Builder()
.dmaapUserPassword(
- Optional.ofNullable(consumerDmaapUserPassword).filter(p -> !p.isEmpty())
+ Optional.ofNullable(consumerDmaapUserPassword).filter(isEmpty.negate())
.orElse(dmaapConsumerConfiguration.dmaapUserPassword()))
.dmaapUserName(
- Optional.ofNullable(consumerDmaapUserName).filter(p -> !p.isEmpty())
+ Optional.ofNullable(consumerDmaapUserName).filter(isEmpty.negate())
.orElse(dmaapConsumerConfiguration.dmaapUserName()))
.dmaapHostName(
- Optional.ofNullable(consumerDmaapHostName).filter(p -> !p.isEmpty())
+ Optional.ofNullable(consumerDmaapHostName).filter(isEmpty.negate())
.orElse(dmaapConsumerConfiguration.dmaapHostName()))
.dmaapPortNumber(
Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
.orElse(dmaapConsumerConfiguration.dmaapPortNumber()))
.dmaapProtocol(
- Optional.ofNullable(consumerDmaapProtocol).filter(p -> !p.isEmpty())
+ Optional.ofNullable(consumerDmaapProtocol).filter(isEmpty.negate())
.orElse(dmaapConsumerConfiguration.dmaapProtocol()))
.dmaapContentType(
- Optional.ofNullable(consumerDmaapContentType).filter(p -> !p.isEmpty())
+ Optional.ofNullable(consumerDmaapContentType).filter(isEmpty.negate())
.orElse(dmaapConsumerConfiguration.dmaapContentType()))
.dmaapTopicName(
- Optional.ofNullable(consumerDmaapTopicName).filter(p -> !p.isEmpty())
+ Optional.ofNullable(consumerDmaapTopicName).filter(isEmpty.negate())
.orElse(dmaapConsumerConfiguration.dmaapTopicName()))
.messageLimit(
Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty())
.orElse(dmaapConsumerConfiguration.messageLimit()))
.timeoutMS(Optional.ofNullable(consumerTimeoutMS).filter(p -> !p.toString().isEmpty())
.orElse(dmaapConsumerConfiguration.timeoutMS()))
- .consumerGroup(Optional.ofNullable(consumerGroup).filter(p -> !p.isEmpty())
+ .consumerGroup(Optional.ofNullable(consumerGroup).filter(isEmpty.negate())
.orElse(dmaapConsumerConfiguration.consumerGroup()))
- .consumerId(Optional.ofNullable(consumerId).filter(p -> !p.isEmpty())
+ .consumerId(Optional.ofNullable(consumerId).filter(isEmpty.negate())
.orElse(dmaapConsumerConfiguration.consumerId()))
.build();
}
@Override
public AAIClientConfiguration getAAIClientConfiguration() {
return new ImmutableAAIClientConfiguration.Builder()
- .aaiHost(Optional.ofNullable(aaiHost).filter(p -> !p.isEmpty()).orElse(aaiClientConfiguration.aaiHost()))
+ .aaiHost(Optional.ofNullable(aaiHost).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiHost()))
.aaiHostPortNumber(
Optional.ofNullable(aaiHostPortNumber).filter(p -> !p.toString().isEmpty())
.orElse(aaiClientConfiguration.aaiHostPortNumber()))
Optional.ofNullable(aaiIgnoreSSLCertificateErrors).filter(p -> !p.toString().isEmpty())
.orElse(aaiClientConfiguration.aaiIgnoreSSLCertificateErrors()))
.aaiProtocol(
- Optional.ofNullable(aaiProtocol).filter(p -> !p.isEmpty()).orElse(aaiClientConfiguration.aaiProtocol()))
+ Optional.ofNullable(aaiProtocol).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiProtocol()))
.aaiUserName(
- Optional.ofNullable(aaiUserName).filter(p -> !p.isEmpty()).orElse(aaiClientConfiguration.aaiUserName()))
- .aaiUserPassword(Optional.ofNullable(aaiUserPassword).filter(p -> !p.isEmpty())
+ Optional.ofNullable(aaiUserName).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiUserName()))
+ .aaiUserPassword(Optional.ofNullable(aaiUserPassword).filter(isEmpty.negate())
.orElse(aaiClientConfiguration.aaiUserPassword()))
- .aaiBasePath(Optional.ofNullable(aaiBasePath).filter(p -> !p.isEmpty())
+ .aaiBasePath(Optional.ofNullable(aaiBasePath).filter(isEmpty.negate())
.orElse(aaiClientConfiguration.aaiBasePath()))
.aaiPnfPath(
- Optional.ofNullable(aaiPnfPath).filter(p -> !p.isEmpty()).orElse(aaiClientConfiguration.aaiPnfPath()))
+ Optional.ofNullable(aaiPnfPath).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiPnfPath()))
.aaiHeaders(aaiClientConfiguration.aaiHeaders())
.build();
}
public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return new ImmutableDmaapPublisherConfiguration.Builder()
.dmaapContentType(
- Optional.ofNullable(producerDmaapContentType).filter(p -> !p.isEmpty())
+ Optional.ofNullable(producerDmaapContentType).filter(isEmpty.negate())
.orElse(dmaapPublisherConfiguration.dmaapContentType()))
.dmaapHostName(
- Optional.ofNullable(producerDmaapHostName).filter(p -> !p.isEmpty())
+ Optional.ofNullable(producerDmaapHostName).filter(isEmpty.negate())
.orElse(dmaapPublisherConfiguration.dmaapHostName()))
.dmaapPortNumber(
Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
.orElse(dmaapPublisherConfiguration.dmaapPortNumber()))
.dmaapProtocol(
- Optional.ofNullable(producerDmaapProtocol).filter(p -> !p.isEmpty())
+ Optional.ofNullable(producerDmaapProtocol).filter(isEmpty.negate())
.orElse(dmaapPublisherConfiguration.dmaapProtocol()))
.dmaapTopicName(
- Optional.ofNullable(producerDmaapTopicName).filter(p -> !p.isEmpty())
+ Optional.ofNullable(producerDmaapTopicName).filter(isEmpty.negate())
.orElse(dmaapPublisherConfiguration.dmaapTopicName()))
.dmaapUserName(
- Optional.ofNullable(producerDmaapUserName).filter(p -> !p.isEmpty())
+ Optional.ofNullable(producerDmaapUserName).filter(isEmpty.negate())
.orElse(dmaapPublisherConfiguration.dmaapUserName()))
.dmaapUserPassword(
- Optional.ofNullable(producerDmaapUserPassword).filter(p -> !p.isEmpty())
+ Optional.ofNullable(producerDmaapUserPassword).filter(isEmpty.negate())
.orElse(dmaapPublisherConfiguration.dmaapUserPassword()))
.build();
}
private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private static final Logger logger = LoggerFactory.getLogger(PrhAppConfig.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
AAIClientConfiguration aaiClientConfiguration;
@Profile("prod")
public class SwaggerConfig extends WebMvcConfigurationSupport {
- public static final String PACKAGE_PATH = "org.onap.dcaegen2.services.prh";
- public static final String API_TITLE = "PRH app server";
- public static final String DESCRIPTION = "This page lists all the rest apis for PRH app server.";
- public static final String VERSION = "1.0";
- public static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
- public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
- public static final String SWAGGER_UI = "swagger-ui.html";
- public static final String WEBJARS = "/webjars/**";
+ private static final String PACKAGE_PATH = "org.onap.dcaegen2.services.prh";
+ private static final String API_TITLE = "PRH app server";
+ private static final String DESCRIPTION = "This page lists all the rest apis for PRH app server.";
+ private static final String VERSION = "1.0";
+ private static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
+ private static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
+ private static final String SWAGGER_UI = "swagger-ui.html";
+ private static final String WEBJARS = "/webjars/**";
@Bean
public Docket api() {
@Api(value = "HeartbeatController", description = "Check liveness of PRH service")
public class HeartbeatController {
- private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RequestMapping(value = "heartbeat", method = RequestMethod.GET)
@ApiOperation(value = "Returns liveness of PRH service")
@ApiResponses(value = {
- @ApiResponse(code = 200, message = "PRH sevice is living"),
- @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
- @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
- @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")
+ @ApiResponse(code = 200, message = "PRH sevice is living"),
+ @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
+ @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")
}
)
public Mono<ResponseEntity<String>> heartbeat() {
logger.trace("Receiving heartbeat request");
return Mono.defer(() ->
- Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK))
+ Mono.just(new ResponseEntity<>("alive", HttpStatus.OK))
);
}
}
\ No newline at end of file
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.onap.dcaegen2.services.prh.configuration.SchedulerConfig;
-import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/5/18
*/
@Api(value = "ScheduleController", description = "Schedule Controller")
public class ScheduleController {
- private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final SchedulerConfig schedulerConfig;
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.exceptions;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ */
+public class DmaapEmptyResponseException extends PrhTaskException {
+
+ public DmaapEmptyResponseException() {
+ super();
+ }
+}
*/
public class PrhTaskException extends Exception {
+ public PrhTaskException() {
+ super();
+ }
+
public PrhTaskException(String message) {
super(message);
}
import com.google.gson.JsonParser;
import java.util.Optional;
import java.util.stream.StreamSupport;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
public class DmaapConsumerJsonParser {
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String EVENT = "event";
private static final String OTHER_FIELDS = "otherFields";
private static final String PNF_OAM_IPV_4_ADDRESS = "pnfOamIpv4Address";
private static final String PNF_VENDOR_NAME = "pnfVendorName";
private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
- public Optional<ConsumerDmaapModel> getJsonObject(String message) throws DmaapNotFoundException {
+
+ public Optional<ConsumerDmaapModel> getJsonObject(String message)
+ throws PrhTaskException {
JsonElement jsonElement = new JsonParser().parse(message);
+ Optional<ConsumerDmaapModel> consumerDmaapModel;
if (jsonElement.isJsonObject()) {
- return Optional.of(create(jsonElement.getAsJsonObject()));
+ consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
} else {
- return Optional
+ consumerDmaapModel = Optional
.of(create(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
.flatMap(this::getJsonObjectFromAnArray)
- .orElse(null)));
+ .orElseThrow(DmaapEmptyResponseException::new)));
}
+ logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
+ return consumerDmaapModel;
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIConsumerClient;
import org.slf4j.Logger;
public class AAIConsumerTaskImpl extends
AAIConsumerTask<ConsumerDmaapModel, String, AAIClientConfiguration> {
- private static final Logger logger = LoggerFactory.getLogger(AAIConsumerTaskImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private AAIConsumerClient aaiConsumerClient;
}
}
+ @Override
+ protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
+ String response = execute(body);
+ if (taskProcess != null && response != null && !response.isEmpty()) {
+ taskProcess.receiveRequest(response);
+ }
+ }
+
@Override
public String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
}
@Override
- AAIClientConfiguration resolveConfiguration() {
+ protected AAIClientConfiguration resolveConfiguration() {
return prhAppConfig.getAAIClientConfiguration();
}
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
import org.onap.dcaegen2.services.prh.service.HttpUtils;
public class AAIProducerTaskImpl extends
AAIProducerTask<ConsumerDmaapModel, ConsumerDmaapModel, AAIClientConfiguration> {
- private static final Logger logger = LoggerFactory.getLogger(AAIProducerTaskImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private AAIProducerClient aaiProducerClient;
@Override
ConsumerDmaapModel publish(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
- logger.trace("Method called with arg {}", consumerDmaapModel);
+ logger.info("Sending PNF model to AAI {}", consumerDmaapModel);
try {
return aaiProducerClient.getHttpResponse(consumerDmaapModel)
.filter(HttpUtils::isSuccessfulResponseCode).map(response -> consumerDmaapModel).orElseThrow(() ->
new AAINotFoundException("Incorrect response code for continuation of tasks workflow"));
- } catch ( URISyntaxException e) {
+ } catch (URISyntaxException e) {
logger.warn("Patch request not successful", e);
throw new AAINotFoundException("Patch request not successful");
}
}
+ @Override
+ protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
+ ConsumerDmaapModel response = execute(body);
+ if (taskProcess != null && response != null) {
+ taskProcess.receiveRequest(response);
+ }
+ }
+
@Override
public ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
return publish(consumerDmaapModel);
}
- AAIClientConfiguration resolveConfiguration() {
+ protected AAIClientConfiguration resolveConfiguration() {
return prhAppConfig.getAAIClientConfiguration();
}
*/
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
*/
abstract class DmaapConsumerTask<R, S, C> extends Task<R, S, C> {
- abstract ConsumerDmaapModel consume(String message) throws DmaapNotFoundException;
+ abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
abstract ExtendedDmaapConsumerHttpClientImpl resolveClient();
package org.onap.dcaegen2.services.prh.tasks;
import java.util.Optional;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
-
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
import org.slf4j.Logger;
public class DmaapConsumerTaskImpl extends
DmaapConsumerTask<String, ConsumerDmaapModel, DmaapConsumerConfiguration> {
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private ExtendedDmaapConsumerHttpClientImpl extendedDmaapConsumerHttpClient;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
}
+
@Override
- ConsumerDmaapModel consume(String message) throws DmaapNotFoundException {
- logger.trace("Method called with arg {}", message);
+ ConsumerDmaapModel consume(String message) throws PrhTaskException {
+ logger.info("Consumed model from DmaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message)
- .orElseThrow(() -> new DmaapNotFoundException(String.format("Nothing to consume from DmaaP %s topic.",
- resolveConfiguration().dmaapTopicName())));
+ .orElseThrow(() -> new DmaapNotFoundException("Null response from JSONObject in single reqeust"));
+
+ }
+
+ @Override
+ protected void receiveRequest(String body) throws PrhTaskException {
+ try {
+ ConsumerDmaapModel response = execute(body);
+ if (taskProcess != null && response != null) {
+ taskProcess.receiveRequest(response);
+ }
+ } catch (DmaapEmptyResponseException e) {
+ logger.warn("Nothing to consume from DmaaP {} topic.",
+ resolveConfiguration().dmaapTopicName());
+ }
+
}
@Override
}
@Override
- DmaapConsumerConfiguration resolveConfiguration() {
+ protected DmaapConsumerConfiguration resolveConfiguration() {
return prhAppConfig.getDmaapConsumerConfiguration();
}
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
import org.slf4j.Logger;
public class DmaapPublisherTaskImpl extends
DmaapPublisherTask<ConsumerDmaapModel, Integer, DmaapPublisherConfiguration> {
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private ExtendedDmaapProducerHttpClientImpl extendedDmaapProducerHttpClient;
@Override
Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
- logger.trace("Method called with arg {}", consumerDmaapModel);
+ logger.info("Publishing on DmaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
+ consumerDmaapModel);
return extendedDmaapProducerHttpClient.getHttpProducerResponse(consumerDmaapModel)
.filter(response -> response == HttpStatus.OK.value())
.orElseThrow(() -> new DmaapNotFoundException("Incorrect response from Dmaap"));
}
+ @Override
+ protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
+ Integer response = execute(body);
+ if (taskProcess != null && response != null) {
+ taskProcess.receiveRequest(response);
+ }
+ }
+
@Override
public Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
}
@Override
- DmaapPublisherConfiguration resolveConfiguration() {
+ protected DmaapPublisherConfiguration resolveConfiguration() {
return prhAppConfig.getDmaapPublisherConfiguration();
}
@Component
public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapProducerTask;
public abstract class Task<R, S, C> {
- private Task taskProcess;
+ Task taskProcess;
- public void setNext(Task task) {
- this.taskProcess = task;
- }
-
- public void receiveRequest(R body) throws PrhTaskException {
+ abstract protected void receiveRequest(R body) throws PrhTaskException;
- S response = execute(body);
- if (taskProcess != null) {
- taskProcess.receiveRequest(response);
- }
- }
+ abstract protected S execute(R object) throws PrhTaskException;
- abstract S execute(R object) throws PrhTaskException;
+ abstract protected C resolveConfiguration();
- abstract C resolveConfiguration();
+ void setNext(Task task) {
+ this.taskProcess = task;
+ }
}
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.function.Executable;
import org.onap.dcaegen2.services.prh.IT.junit5.mockito.MockitoExtension;
/**
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+ "\"pnfVendorName\":\"Nokia\"}}}]";
@Test
- void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_validationNotThrowingAnException()
+ throws PrhTaskException {
//given
String message =
"[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
}
@Test
- void whenPassingCorrectJsonWithoutIPV4_validationNotThrowingAnException() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithoutIPV4_validationNotThrowingAnException()
+ throws PrhTaskException {
//given
String message =
"[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
}
@Test
- void whenPassingCorrectJsonWihoutIPV6_validationNotThrowingAnException() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWihoutIPV6_validationNotThrowingAnException()
+ throws PrhTaskException {
//given
String message =
"[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
package org.onap.dcaegen2.services.prh.model;
-import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapterFactory;
+import java.util.ServiceLoader;
public class CommonFunctions {
- private static Gson gson = new GsonBuilder().create();
-
- private CommonFunctions() {}
+ private CommonFunctions() {
+ }
public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
- return gson.toJson(consumerDmaapModel);
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+ return gsonBuilder.create().toJson(ImmutableConsumerDmaapModel.builder().ipv4(consumerDmaapModel.getIpv4())
+ .ipv6(consumerDmaapModel.getIpv6()).pnfName(consumerDmaapModel.getPnfName()).build());
}
}
*/
@Value.Immutable
-@Gson.TypeAdapters
+@Gson.TypeAdapters(fieldNamingStrategy = true)
public interface ConsumerDmaapModel {
- @SerializedName("pnf-name")
+ @SerializedName(value = "pnf-name", alternate = "pnf-name")
String getPnfName();
- @SerializedName("ipaddress-v4-oam")
+ @SerializedName(value = "ipaddress-v4-oam", alternate = "ipaddress-v4-oam")
String getIpv4();
- @SerializedName("ipaddress-v6-oam")
+ @SerializedName(value = "ipaddress-v6-oam", alternate = "ipaddress-v6-oam")
String getIpv6();
}
@Test
void createJsonBody_shouldReturnJsonInString() {
- String expectedResult = "{\"pnfName\":\"NOKnhfsadhff\",\"ipv4\":\"256.22.33.155\",\"ipv6\":\"2001:0db8:85a3:0000:0000:8a2e:0370:7334\"}";
+ String expectedResult = "{\"pnf-name\":\"NOKnhfsadhff\",\"ipaddress-v4-oam\":\"256.22.33.155\",\"ipaddress-v6-oam\":\"2001:0db8:85a3:0000:0000:8a2e:0370:7334\"}";
assertEquals(expectedResult, CommonFunctions.createJsonBody(model));
}
}
public class DmaapHttpClientImpl {
- private static Logger logger = LoggerFactory.getLogger(DmaapHttpClientImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final String dmaapHostName;
private final Integer dmaapPortNumber;
public class ExtendedDmaapConsumerHttpClientImpl {
- private static Logger logger = LoggerFactory.getLogger(ExtendedDmaapConsumerHttpClientImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final CloseableHttpClient closeableHttpClient;
private final String dmaapHostName;
public class ExtendedDmaapProducerHttpClientImpl {
- private static Logger logger = LoggerFactory.getLogger(ExtendedDmaapProducerHttpClientImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final CloseableHttpClient closeableHttpClient;
private final String dmaapHostName;