Removed warnings.
Improved code formatting
Removed unused files
Removed datafile_endpoints.json from the container
Change-Id: I7334775be793cd9cb709ca1e031c620c2c1c2b3f
Issue-ID: DCAEGEN2-1645
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
org.onap.dcaegen2.collectors.datafile: WARN
file: /var/log/ONAP/application.log
app:
- filepath: /opt/app/datafile/config/datafile_endpoints.json
+ filepath: src/test/resources/datafile_endpoints_test.json
+++ /dev/null
-{
- "config": {
- "//description": "This file is only used for testing purposes",
- "dmaap.ftpesConfig.keyCert": "/config/dfc.jks",
- "dmaap.ftpesConfig.keyPassword": "secret",
- "dmaap.ftpesConfig.trustedCa": "config/ftp.jks",
- "dmaap.ftpesConfig.trustedCaPassword": "secret",
- "dmaap.security.trustStorePath": "change it",
- "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
- "dmaap.security.keyStorePath": "keyStorePath",
- "dmaap.security.keyStorePasswordPath": "change it",
- "dmaap.security.enableDmaapCertAuth": "false",
- "streams_publishes": {
- "PM_MEAS_FILES": {
- "type": "data_router",
- "dmaap_info": {
- "username": "CYE9fl40",
- "location": "loc00",
- "log_url": "https://dmaap-dr-prov/feedlog/4",
- "publisher_id": "4.307dw",
- "password": "izBJD8nLjawq0HMG",
- "publish_url": "https://dmaap-dr-prov/publish/4"
- }
- }
- },
- "streams_subscribes": {
- "dmaap_subscriber": {
- "dmaap_info": {
- "topic_url": "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
- },
- "type": "message_router"
- }
- }
- }
-}
-
<properties>
<docker.image.name>onap/${project.groupId}.${project.artifactId}</docker.image.name>
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
- <spring.version>5.1.4.RELEASE</spring.version>
- <spring-boot.version>2.1.2.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
- <version>2.0.1.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
- <version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
- <version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
- <version>${spring.version}</version>
</dependency>
<dependency>
<groupId>com.spotify</groupId>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
- <version>9.0.14</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
- <version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
- <version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
RUN mkdir -p /var/log/ONAP
ADD /target/datafile-app-server.jar /opt/app/datafile/
-
ADD /config/application.yaml /opt/app/datafile/config/
ADD /config/dfc.jks.b64 /opt/app/datafile/config/
-ADD /config/datafile_endpoints.json /opt/app/datafile/config/
ADD /config/ftp.jks.b64 /opt/app/datafile/config/
EXPOSE 8100 8433
USER datafile
-ENTRYPOINT ["/usr/bin/java", "-jar", "/opt/app/datafile/datafile-app-server.jar"]
\ No newline at end of file
+ENTRYPOINT ["/usr/bin/java", "-jar", "/opt/app/datafile/datafile-app-server.jar"]
}
Flux<AppConfig> createRefreshTask(Map<String, String> context) {
- return getEnvironment(systemEnvironment, context).flatMap(this::createCbsClient)
- .flatMapMany(this::periodicConfigurationUpdates).map(this::parseCloudConfig)
+ return getEnvironment(systemEnvironment, context) //
+ .flatMap(this::createCbsClient) //
+ .flatMapMany(this::periodicConfigurationUpdates) //
+ .map(this::parseCloudConfig) //
.onErrorResume(this::onErrorResume);
}
throw new JsonSyntaxException("Root is not a json object");
}
parseCloudConfig(rootObject);
+ logger.info("Local configuration file loaded: {}", filepath);
} catch (JsonSyntaxException | IOException e) {
- logger.warn("Local configuration file not loaded: {}", filepath, e);
+ logger.trace("Local configuration file not loaded: {}", filepath, e);
}
}
- private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
- Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
- if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
- logger.error(
- "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
- consumerConfiguration, publisherConfiguration, ftpesConfig);
- } else {
- this.dmaapConsumerConfiguration = consumerConfiguration;
- this.publishingConfigurations = publisherConfiguration;
- this.ftpesConfiguration = ftpesConfig;
- }
+ private synchronized void setConfiguration(@NotNull ConsumerConfiguration consumerConfiguration,
+ @NotNull Map<String, PublisherConfiguration> publisherConfiguration, @NotNull FtpesConfig ftpesConfig) {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.publishingConfigurations = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
}
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+
+import javax.validation.constraints.NotNull;
+
import java.util.Set;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
*
* @throws DatafileTaskException if a member of the configuration is missing.
*/
- public Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
+ public @NotNull Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject();
Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator();
-
Map<String, PublisherConfiguration> result = new HashMap<>();
while (changeIdentifierList.hasNext()) {
-
String changeIdentifier = changeIdentifierList.next();
JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier);
JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject();
result.put(cfg.changeIdentifier(), cfg);
}
return result;
-
}
/**
* @return the consumer configuration.
* @throws DatafileTaskException if a member of the configuration is missing.
*/
- public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
+ public @NotNull ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject();
Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
if (topics.size() != 1) {
* @return the xNF communication security configuration.
* @throws DatafileTaskException if a member of the configuration is missing.
*/
- public FtpesConfig getFtpesConfig() throws DatafileTaskException {
+ public @NotNull FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
.keyCert(getAsString(jsonObject, "dmaap.ftpesConfig.keyCert"))
.keyPassword(getAsString(jsonObject, "dmaap.ftpesConfig.keyPassword"))
.build();
}
- private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
+ private static @NotNull JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
JsonElement elem = obj.get(memberName);
if (elem == null) {
throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
return elem;
}
- private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
+ private static @NotNull String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
return get(obj, memberName).getAsString();
}
- private static JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
+ private static @NotNull JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
return get(obj, memberName).getAsJsonObject();
}
*
* @param schemeString the string to convert to <code>Scheme</code>.
* @return The corresponding <code>Scheme</code>
- * @throws Exception if the value of the string doesn't match any defined scheme.
+ * @throws DatafileTaskException if the value of the string doesn't match any defined scheme.
*/
public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
Scheme result;
return str.toString();
}
- private String format(String name, Object value) {
+ private static String format(String name, Object value) {
String header = name + ":";
- return String.format("%-24s%-22s\n", header, value);
+ return String.format("%-24s%-22s%n", header, value);
}
public int getNoOfCollectedFiles() {
public int getTotalReceivedEvents() {
return totalReceivedEvents;
}
-
- /**
- * Resets all data.
- */
- public void clear() {
- numberOfTasks.set(0);
- numberOfSubscriptions.set(0);
- noOfCollectedFiles = 0;
- noOfFailedFtpAttempts = 0;
- noOfFailedFtp = 0;
- noOfFailedPublishAttempts = 0;
- totalPublishedFiles = 0;
- noOfFailedPublish = 0;
- lastPublishedTime = Instant.MIN;
- totalReceivedEvents = 0;
- lastEventTime = Instant.MIN;
- }
}
*/
public static void appendTraceInfo(HttpRequestBase httpRequest) {
String requestId = MDC.get(MdcVariables.REQUEST_ID);
- httpRequest.addHeader(MdcVariables.X_ONAP_REQUEST_ID, requestId);
httpRequest.addHeader("X-RequestID", requestId); // deprecated
httpRequest.addHeader("X-TransactionID", requestId); // deprecated
String invocationId = UUID.randomUUID().toString();
- httpRequest.addHeader(MdcVariables.X_INVOCATION_ID, invocationId);
logger.info(INVOKE, "Invoking request with invocation ID {}", invocationId);
}
* @param headers a received HTPP header
*/
public static void initializeTraceContext(HttpHeaders headers) {
- String requestId = headers.getFirst(MdcVariables.X_ONAP_REQUEST_ID);
+ String requestId = headers.getFirst(MdcVariables.httpHeader(MdcVariables.REQUEST_ID));
if (StringUtils.isBlank(requestId)) {
requestId = UUID.randomUUID().toString();
}
- String invocationId = headers.getFirst(MdcVariables.X_INVOCATION_ID);
+ String invocationId = headers.getFirst(MdcVariables.httpHeader(MdcVariables.INVOCATION_ID));
if (StringUtils.isBlank(invocationId)) {
invocationId = UUID.randomUUID().toString();
}
public Flux<FileReadyMessage> getMessageRouterResponse() {
logger.trace("getMessageRouterResponse called");
try {
- DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient = createHttpClient();
- return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())));
+ DMaaPConsumerReactiveHttpClient client = createHttpClient();
+ return consume((client.getDMaaPConsumerResponse(Optional.empty())));
} catch (DatafileTaskException e) {
logger.warn("Unable to get response from message router", e);
return Flux.empty();
}
public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException {
-
return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap());
}
+++ /dev/null
-spring.profiles.active=prod
-server.port=8433
-server.ssl.key-store-type=PKCS12
-server.ssl.key-store-password=ericssondfc
-server.ssl.key-store=classpath:keystore.jks
-server.ssl.key-password=ericssondfc
-server.ssl.key-alias=tomcat-localhost
-logging.level.root=ERROR
-logging.level.org.springframework=ERROR
-logging.level.org.springframework.data=ERROR
-logging.level.org.onap.dcaegen2.collectors.datafile=INFO
-logging.file=logs/log/application.log
-app.filepath=config/datafile_endpoints.json
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:task="http://www.springframework.org/schema/task"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd
- http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
-
- <context:component-scan
- base-package="org.onap.dcaegen2.collectors.datafile" />
- <task:scheduled-tasks>
- <task:scheduled ref="scheduleController"
- method="startTasks" fixed-rate="1000" />
- </task:scheduled-tasks>
-</beans>
public static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
new ImmutableDmaapConsumerConfiguration.Builder() //
.endpointUrl(
- "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
+ "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
.timeoutMs(-1) //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("admin") //
- .dmaapUserPassword("admin") //
+ .dmaapHostName("localhost") //
+ .dmaapUserName("dradmin") //
+ .dmaapUserPassword("dradmin") //
.dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
.dmaapPortNumber(2222) //
.dmaapContentType("application/json") //
public static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
.topicUrl(
- "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
+ "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
ImmutablePublisherConfiguration.builder() //
- .publishUrl("https://message-router.onap.svc.cluster.local:3907/publish/1") //
- .logUrl("https://dmaap.example.com/feedlog/972").trustStorePath("trustStorePath") //
+ .publishUrl("https://localhost:3907/publish/1") //
+ .logUrl("https://localhost:3907/feedlog/1") //
+ .trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
.changeIdentifier("PM_MEAS_FILES") //
- .userName("user") //
- .passWord("password") //
+ .userName("CYE9fl40") //
+ .passWord("izBJD8nLjawq0HMG") //
.build();
private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
new ImmutableDmaapPublisherConfiguration.Builder() //
- .endpointUrl("https://message-router.onap.svc.cluster.local:3907/publish/1").dmaapTopicName("/publish/1") //
- .dmaapUserPassword("password") //
+ .endpointUrl("https://localhost:3907/publish/1") //
+ .dmaapTopicName("/publish/1") //
+ .dmaapUserPassword("izBJD8nLjawq0HMG") //
.dmaapPortNumber(3907) //
.dmaapProtocol("https") //
.dmaapContentType("application/octet-stream") //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("user") //
+ .dmaapHostName("localhost") //
+ .dmaapUserName("CYE9fl40") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
@Test
public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
StepVerifier //
}
@Test
- public void whenPeriodicConfigRefreshNoConsul() {
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+ public void whenPeriodicConfigRefreshNoConsul() {
EnvProperties props = properties();
doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
Flux<JsonObject> err = Flux.error(new IOException());
doReturn(err).when(cbsClient).updates(any(), any(), any());
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
StepVerifier //
HttpHeaders httpHeaders = new HttpHeaders();
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
Mono<ResponseEntity<String>> response = scheduleControllerUnderTest.startTasks(httpHeaders);
validateLogging(logAppender, "Start request");
HttpHeaders httpHeaders = new HttpHeaders();
// The following headers are set to create branch coverage in MappedDiagnosticContext:initializeTraceContext().
- httpHeaders.set(MdcVariables.X_ONAP_REQUEST_ID, "Onap request ID");
- httpHeaders.set(MdcVariables.X_INVOCATION_ID, "Invocation ID");
+ httpHeaders.set(MdcVariables.httpHeader(MdcVariables.REQUEST_ID), "Onap request ID");
+ httpHeaders.set(MdcVariables.httpHeader(MdcVariables.INVOCATION_ID), "Invocation ID");
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
Mono<ResponseEntity<String>> response = scheduleControllerUnderTest.startTasks(httpHeaders);
validateLogging(logAppender, "Start request");
HttpHeaders httpHeaders = new HttpHeaders();
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
Mono<ResponseEntity<String>> actualResponse = scheduleControllerUnderTest.stopTask(httpHeaders);
validateLogging(logAppender, "Stop request");
}
private void validateLogging(ListAppender<ILoggingEvent> logAppender, String infoMessage) {
- assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
+ assertEquals("ENTRY", logAppender.list.get(0).getMarker().getName());
assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
assertTrue("Info missing in log", logAppender.list.toString().contains("[INFO] " + infoMessage));
- assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT");
+ assertEquals("EXIT", logAppender.list.get(1).getMarker().getName());
logAppender.stop();
}
}
public void heartbeat_success() {
HttpHeaders httpHeaders = new HttpHeaders();
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(StatusController.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(StatusController.class);
Mono<ResponseEntity<String>> result = controllerUnderTest.heartbeat(httpHeaders);
validateLogging(logAppender);
}
private void validateLogging(ListAppender<ILoggingEvent> logAppender) {
- assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
+ assertEquals("ENTRY", logAppender.list.get(0).getMarker().getName());
assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
assertTrue("Info missing in log", logAppender.list.toString().contains("[INFO] Heartbeat request"));
- assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT");
+ assertEquals("EXIT", logAppender.list.get(1).getMarker().getName());
logAppender.stop();
}
}
when(clientResponseMock.statusCode()).thenReturn(HttpStatus.OK);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
Mono<ClientResponse> logResponse = dmaapWebClientUndetTest.logResponse(clientResponseMock);
assertEquals(clientResponseMock, logResponse.block());
- assertEquals(logAppender.list.get(0).getLevel(), Level.TRACE);
- assertEquals(logAppender.list.get(0).getFormattedMessage(), "Response Status 200 OK");
+ assertEquals(Level.TRACE, logAppender.list.get(0).getLevel());
+ assertEquals("Response Status 200 OK", logAppender.list.get(0).getFormattedMessage());
logAppender.stop();
}
DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
Mono<ClientRequest> logRequest = dmaapWebClientUndetTest.logRequest(clientRequesteMock);
assertEquals(clientRequesteMock, logRequest.block());
- assertEquals(logAppender.list.get(0).getLevel(), Level.TRACE);
+ assertEquals(Level.TRACE, logAppender.list.get(0).getLevel());
assertEquals("Request: GET http://test", logAppender.list.get(0).getFormattedMessage());
- assertEquals(logAppender.list.get(1).getLevel(), Level.TRACE);
+ assertEquals(Level.TRACE, logAppender.list.get(1).getLevel());
assertEquals("HTTP request headers: [header:\"value\"]", logAppender.list.get(1).getFormattedMessage());
logAppender.stop();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectComplete().verify();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectComplete().verify();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectComplete().verify();
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectNextCount(0).expectComplete().verify();
private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static Map<String, String> context = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
- private static final Counters counters = new Counters();
+ private Counters counters;
@BeforeAll
public static void setUp() {
.changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
appConfig = mock(AppConfig.class);
- publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, counters));
}
@BeforeEach
void setUpTest() {
- counters.clear();
+ counters = new Counters();
+ publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, counters));
}
@Test
void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterPublisher.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterPublisher.class);
StepVerifier.create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0)))
.expectNext(filePublishInformation) //
.verifyComplete();
prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterPublisher.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterPublisher.class);
StepVerifier.create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
private SftpClient sftpClientMock = mock(SftpClient.class);
private final Map<String, String> contextMap = new HashMap<>();
- private final Counters counters = new Counters();
+ private Counters counters;
private MessageMetaData createMessageMetaData() {
return ImmutableMessageMetaData.builder() //
@BeforeEach
void setUpTest() {
- counters.clear();
+ counters = new Counters();
}
@Test
verify(ftpsClientMock, times(1)).close();
verifyNoMoreInteractions(ftpsClientMock);
- assertEquals("collectedFiles should have been 1", counters.getNoOfCollectedFiles(), 1);
- assertEquals("failedFtpAttempts should have been 0", counters.getNoOfFailedFtpAttempts(), 0);
+ assertEquals("collectedFiles should have been 1", 1, counters.getNoOfCollectedFiles());
+ assertEquals("failedFtpAttempts should have been 0", 0, counters.getNoOfFailedFtpAttempts());
}
@Test
verify(sftpClientMock, times(2)).close();
verifyNoMoreInteractions(sftpClientMock);
- assertEquals("collectedFiles should have been 2", counters.getNoOfCollectedFiles(), 2);
+ assertEquals("collectedFiles should have been 2", 2, counters.getNoOfCollectedFiles());
}
@Test
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- assertEquals("collectedFiles should have been 0", counters.getNoOfCollectedFiles(), 0);
- assertEquals("failedFtpAttempts should have been 4", counters.getNoOfFailedFtpAttempts(), 4);
+ assertEquals("collectedFiles should have been 0", 0, counters.getNoOfCollectedFiles());
+ assertEquals("failedFtpAttempts should have been 4", 4, counters.getNoOfFailedFtpAttempts());
}
@Test
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- assertEquals("collectedFiles should have been 0", counters.getNoOfCollectedFiles(), 0);
- assertEquals("failedFtpAttempts should have been 1", counters.getNoOfFailedFtpAttempts(), 1);
+ assertEquals("collectedFiles should have been 0", 0, counters.getNoOfCollectedFiles());
+ assertEquals("failedFtpAttempts should have been 1", 1, counters.getNoOfFailedFtpAttempts());
}
@Test
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- assertEquals("collectedFiles should have been 1", counters.getNoOfCollectedFiles(), 1);
- assertEquals("failedFtpAttempts should have been 1", counters.getNoOfFailedFtpAttempts(), 1);
+ assertEquals("collectedFiles should have been 1", 1, counters.getNoOfCollectedFiles());
+ assertEquals("failedFtpAttempts should have been 1", 1, counters.getNoOfFailedFtpAttempts());
}
}
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
testedObject.executeDatafileMainTask();
await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
MDC.setContextMap(contextMap);
doReturn(Flux.error(new Exception("Failed"))).when(consumerMock).getMessageRouterResponse();
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
StepVerifier //
.create(testedObject.createMainTask(contextMap)) //
.expectSubscription() //
.when(dataRouterMock) //
.publishFile(notNull(), anyLong(), notNull());
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
StepVerifier //
.create(testedObject.createMainTask(contextMap)) //
.expectSubscription() //
"PM_MEAS_FILES": {
"type": "data_router",
"dmaap_info": {
- "username": "user",
- "log_url": "https://dmaap.example.com/feedlog/972",
- "publish_url": "https://message-router.onap.svc.cluster.local:3907/publish/1",
+ "username": "CYE9fl40",
"location": "loc00",
- "password": "password",
- "publisher_id": "972.360gm"
+ "log_url": "https://localhost:3907/feedlog/1",
+ "publisher_id": "4.307dw",
+ "password": "izBJD8nLjawq0HMG",
+ "publish_url": "https://localhost:3907/publish/1"
}
}
},
"streams_subscribes": {
"dmaap_subscriber": {
"dmaap_info": {
- "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ "topic_url": "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
},
"type": "message_router"
}
}
}
-}
\ No newline at end of file
+}
+
+++ /dev/null
-version: '3'
-services:
- data-file-collector:
- image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.collectors.datafile.datafile-app-server
- command: >
- --dmaap.dmaapConsumerConfiguration.dmaapHostName=<HOST-TO-SET-DMAAP-MR>
- --dmaap.dmaapConsumerConfiguration.dmaapPortNumber=3904
- --dmaap.dmaapConsumerConfiguration.dmaapTopicName=/events/unauthenticated.VES_NOTIFICATION_OUTPUT
- --dmaap.dmaapConsumerConfiguration.dmaapProtocol=http
- --dmaap.dmaapConsumerConfiguration.dmaapUserName=admin
- --dmaap.dmaapConsumerConfiguration.dmaapUserPassword=admin
- --dmaap.dmaapConsumerConfiguration.dmaapContentType=application/json
- --dmaap.dmaapConsumerConfiguration.consumerId=C12
- --dmaap.dmaapConsumerConfiguration.consumerGroup=OpenDCAE-c12
- --dmaap.dmaapConsumerConfiguration.timeoutMs=-1
- --dmaap.dmaapConsumerConfiguration.message-limit=-1
- --dmaap.dmaapProducerConfiguration.dmaapHostName=<HOST-TO-SET-DMAAP-DR>
- --dmaap.dmaapProducerConfiguration.dmaapPortNumber=8443
- --dmaap.dmaapProducerConfiguration.dmaapTopicName=publish
- --dmaap.dmaapProducerConfiguration.dmaapProtocol=https
- --dmaap.dmaapProducerConfiguration.dmaapUserName=dradmin
- --dmaap.dmaapProducerConfiguration.dmaapUserPassword=dradmin
- --dmaap.dmaapProducerConfiguration.dmaapContentType=application/octet-stream
- --dmaap.ftpesConfig.keyCert=config/dfc.jks
- --dmaap.ftpesConfig.keyPassword=secret
- --dmaap.ftpesConfig.trustedCA=config/ftp.jks
- --dmaap.ftpesConfig.trustedCAPassword=secret
- entrypoint:
- - java
- - -Dspring.profiles.active=dev
- - -jar
- - /target/datafile-app-server.jar
- ports:
- - "8100:8100"
- - "8433:8433"
- restart: always
-