import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIConsumerClient;
-public abstract class AAIConsumerTask<R, S, C> extends Task<R, S, C> {
+public abstract class AAIConsumerTask {
abstract Optional<String> consume(ConsumerDmaapModel message) throws AAINotFoundException;
abstract AAIConsumerClient resolveClient();
+
+ abstract protected String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException;
}
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIConsumerClient;
import org.slf4j.Logger;
import org.springframework.stereotype.Component;
@Component
-public class AAIConsumerTaskImpl extends
- AAIConsumerTask<ConsumerDmaapModel, String, AAIClientConfiguration> {
+public class AAIConsumerTaskImpl extends AAIConsumerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
}
}
- @Override
- protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
- String response = execute(body);
- if (taskProcess != null && response != null && !response.isEmpty()) {
- taskProcess.receiveRequest(response);
- }
- }
-
@Override
public String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
return consume(consumerDmaapModel).orElseThrow(() -> new AAINotFoundException("Null response code"));
}
- @Override
protected AAIClientConfiguration resolveConfiguration() {
return prhAppConfig.getAAIClientConfiguration();
}
package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-public abstract class AAIProducerTask<R, S, C> extends Task<R, S, C> {
+public abstract class AAIProducerTask/*<R, S, C> extends Task<R, S, C> */ {
abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AAINotFoundException;
abstract AAIProducerClient resolveClient();
+
+ abstract protected ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
}
*/
package org.onap.dcaegen2.services.prh.tasks;
+import java.net.URISyntaxException;
+import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
import org.onap.dcaegen2.services.prh.service.HttpUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.net.URISyntaxException;
-import java.util.Optional;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@Component
public class AAIProducerTaskImpl extends
- AAIProducerTask<ConsumerDmaapModel, ConsumerDmaapModel, AAIClientConfiguration> {
+ AAIProducerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
}
}
- @Override
- protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
- ConsumerDmaapModel response = execute(body);
- if (taskProcess != null && response != null) {
- taskProcess.receiveRequest(response);
- }
- }
-
@Override
public ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-abstract class DmaapConsumerTask<R, S, C> extends Task<R, S, C> {
+abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ {
abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
abstract ExtendedDmaapConsumerHttpClientImpl resolveClient();
abstract void initConfigs();
+
+ abstract protected ConsumerDmaapModel execute(String object) throws PrhTaskException;
}
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
@Component
-public class DmaapConsumerTaskImpl extends
- DmaapConsumerTask<String, ConsumerDmaapModel, DmaapConsumerConfiguration> {
+public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
}
- @Override
- protected void receiveRequest(String body) throws PrhTaskException {
- try {
- ConsumerDmaapModel response = execute(body);
- if (taskProcess != null && response != null) {
- taskProcess.receiveRequest(response);
- }
- } catch (DmaapEmptyResponseException e) {
- logger.warn("Nothing to consume from DmaaP {} topic.",
- resolveConfiguration().dmaapTopicName());
- }
-
- }
-
@Override
public ConsumerDmaapModel execute(String object) throws PrhTaskException {
extendedDmaapConsumerHttpClient = resolveClient();
prhAppConfig.initFileStreamReader();
}
- @Override
protected DmaapConsumerConfiguration resolveConfiguration() {
return prhAppConfig.getDmaapConsumerConfiguration();
}
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
-abstract class DmaapPublisherTask<R, S, C> extends Task<R, S, C> {
+abstract class DmaapPublisherTask {
abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
abstract ExtendedDmaapProducerHttpClientImpl resolveClient();
+
+ abstract protected Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
}
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
import org.slf4j.Logger;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@Component
-public class DmaapPublisherTaskImpl extends
- DmaapPublisherTask<ConsumerDmaapModel, Integer, DmaapPublisherConfiguration> {
+public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
.orElseThrow(() -> new DmaapNotFoundException("Incorrect response from Dmaap"));
}
- @Override
- protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
- Integer response = execute(body);
- if (taskProcess != null && response != null) {
- taskProcess.receiveRequest(response);
- }
- }
-
@Override
public Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
return publish(consumerDmaapModel);
}
- @Override
protected DmaapPublisherConfiguration resolveConfiguration() {
return prhAppConfig.getDmaapPublisherConfiguration();
}
*/
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
}
public void scheduleMainPrhEventTask() {
- logger.trace("Execution of task was registered");
- setTaskExecutionFlow();
- try {
- dmaapConsumerTask.initConfigs();
- dmaapConsumerTask.receiveRequest("");
- } catch (PrhTaskException e) {
- logger
- .warn("Chain of tasks have been aborted, because some errors occur in prh workflow ", e);
- }
- }
+ logger.trace("Execution of tasks was registered");
- private void setTaskExecutionFlow() {
- dmaapConsumerTask.setNext(aaiProducerTask);
- aaiProducerTask.setNext(dmaapProducerTask);
+ Mono.fromSupplier(() -> Mono.fromCallable(() ->
+ {
+ dmaapConsumerTask.initConfigs();
+ return dmaapConsumerTask.execute("");
+ }).subscribe(consumerDmaapModel -> Mono
+ .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel))
+ .subscribe(
+ aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel))
+ .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp),
+ error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error),
+ () -> logger.info("Completed DmaapPublisher task"))),
+ errorResponse -> logger
+ .warn("Error has been thrown in AAIProducerTask: {}", errorResponse)
+ , () -> logger.info("Completed AAIProducer task")))
+ .subscribe(Disposable::dispose, tasksError -> logger
+ .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError)
+ , () -> logger.info("PRH tasks was consumed properly")).dispose();
}
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcaegen2.services.prh.tasks;
-
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- */
-
-
-public abstract class Task<R, S, C> {
-
- Task taskProcess;
-
- abstract protected void receiveRequest(R body) throws PrhTaskException;
-
- abstract protected S execute(R object) throws PrhTaskException;
-
- abstract protected C resolveConfiguration();
-
- void setNext(Task task) {
- this.taskProcess = task;
- }
-}
@Bean
@Primary
- public Task registerSimpleDmaapConsumerTask() {
+ public DmaapConsumerTask registerSimpleDmaapConsumerTask() {
AppConfig appConfig = spy(AppConfig.class);
doReturn(mock(DmaapConsumerConfiguration.class)).when(appConfig).getDmaapConsumerConfiguration();
DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig));
@Bean
@Primary
- public Task registerSimpleDmaapPublisherTask() {
+ public DmaapPublisherTask registerSimpleDmaapPublisherTask() {
AppConfig appConfig = spy(AppConfig.class);
doReturn(mock(DmaapPublisherConfiguration.class)).when(appConfig).getDmaapPublisherConfiguration();
DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));