org.onap.dcaegen2.services.prh: INFO
file: opt/log/application.log
app:
- filepath: config/prh_endpoints.json
+ filepath: config/prh_endpoints.json
\ No newline at end of file
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
}
@Bean
- TaskScheduler taskScheduler() {
+ ConcurrentTaskScheduler concurrentTaskScheduler() {
return new ConcurrentTaskScheduler();
}
+
+ @Bean
+ ThreadPoolTaskScheduler threadPoolTaskScheduler() {
+ ThreadPoolTaskScheduler threadPoolTaskScheduler
+ = new ThreadPoolTaskScheduler();
+ threadPoolTaskScheduler.setPoolSize(5);
+ threadPoolTaskScheduler.setThreadNamePrefix(
+ "CloudThreadPoolTaskScheduler");
+ return threadPoolTaskScheduler;
+ }
}
import org.onap.dcaegen2.services.prh.config.ImmutableDmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
*/
-@Component
@Configuration
+@EnableConfigurationProperties
public class AppConfig extends PrhAppConfig {
private static Predicate<String> isEmpty = String::isEmpty;
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import java.util.Properties;
+import org.onap.dcaegen2.services.prh.model.EnvProperties;
+import org.onap.dcaegen2.services.prh.service.HttpClientExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/9/18
+ */
+@Configuration
+@EnableConfigurationProperties
+@EnableScheduling
+public class CloudConfiguration extends AppConfig {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private HttpClientExecutorService httpClientExecutorService;
+
+ TaskScheduler cloudTaskScheduler;
+
+ @Value("#{systemEnvironment}")
+ private Properties systemEnvironment;
+
+
+ @Autowired
+ public void setThreadPoolTaskScheduler(ThreadPoolTaskScheduler threadPoolTaskScheduler,
+ HttpClientExecutorService httpClientExecutorService) {
+ this.cloudTaskScheduler = threadPoolTaskScheduler;
+ this.httpClientExecutorService = httpClientExecutorService;
+ }
+
+ protected void runTask() {
+ Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment))
+ .subscribeOn(Schedulers.parallel())
+ .subscribe(this::doOnSucces, this::doOnError);
+ }
+
+ private void doOnError(Throwable throwable) {
+ logger.warn("Error in case of processing system environment.%nMore details below:%n ", throwable);
+ }
+
+ private void doOnSucces(EnvProperties envProperties) {
+ logger.info("Fetching PRH configuration from ConfigBindingService/Consul");
+ Flux.just(httpClientExecutorService.callConsulForConfigBindingServiceEndpoint(envProperties))
+ .flatMap(configBindingServiceUri -> httpClientExecutorService.callConfigBindingServiceForPrhConfiguration(envProperties,
+ configBindingServiceUri)).subscribe();
+ }
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import java.util.Optional;
+import java.util.Properties;
+import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException;
+import org.onap.dcaegen2.services.prh.model.EnvProperties;
+import org.onap.dcaegen2.services.prh.model.ImmutableEnvProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
+ */
+class EnvironmentProcessor {
+
+ private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
+
+ private EnvironmentProcessor() {
+ }
+
+ static Flux<EnvProperties> evaluate(Properties systemEnvironment) {
+ logger.info("Loading configuration from system environment variables");
+ EnvProperties envProperties;
+ try {
+ envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
+ .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
+ .appName(getService(systemEnvironment)).build();
+ } catch (EnvironmentLoaderException e) {
+ return Flux.error(e);
+ }
+ logger.info("Evaluated environment system variables {}", envProperties);
+ return Flux.just(envProperties);
+ }
+
+ private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
+ .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
+ }
+
+ private static Integer getConsultPort(Properties systemEnvironments) {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
+ .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
+ }
+
+ private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE"))
+ .orElseThrow(
+ () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+ }
+
+ private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+ .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+ }
+
+ private static Integer getDefaultPortOfConsul() {
+ logger.warn("$CONSUL_PORT environment has not been defined");
+ logger.warn("$CONSUL_PORT variable will be set to default port {}", 8500);
+ return 8500;
+ }
+}
+
package org.onap.dcaegen2.services.prh.configuration;
import io.swagger.annotations.ApiOperation;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
+import org.springframework.scheduling.support.PeriodicTrigger;
import reactor.core.publisher.Mono;
/**
*/
@Configuration
@EnableScheduling
-public class SchedulerConfig extends PrhAppConfig {
+public class SchedulerConfig extends CloudConfiguration {
- private static final int SCHEDULING_DELAY = 2000;
- private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>();
+ private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 2000;
+ private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 1;
+ private static volatile List<ScheduledFuture> scheduledPrgTaskFutureList = new ArrayList<>();
- private final TaskScheduler taskScheduler;
+ private final ConcurrentTaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
@Autowired
- public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) {
- this.taskScheduler = taskScheduler;
+ public SchedulerConfig(@Qualifier("concurrentTaskScheduler") ConcurrentTaskScheduler concurrentTaskScheduler,
+ ScheduledTasks scheduledTask) {
+ this.taskScheduler = concurrentTaskScheduler;
this.scheduledTask = scheduledTask;
}
*/
@ApiOperation(value = "Get response on stopping task execution")
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
- scheduledFutureList.forEach(x -> x.cancel(false));
- scheduledFutureList.clear();
+ scheduledPrgTaskFutureList.forEach(x -> x.cancel(false));
+ scheduledPrgTaskFutureList.clear();
return Mono.defer(() ->
Mono.just(new ResponseEntity<>("PRH Service has already been stopped!", HttpStatus.CREATED))
);
*
* @return status of operation execution: true - started, false - not started
*/
+
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
- if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler
- .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, SCHEDULING_DELAY));
+ if (scheduledPrgTaskFutureList.isEmpty()) {
+ scheduledPrgTaskFutureList.add(cloudTaskScheduler
+ .scheduleAtFixedRate(super::runTask, Instant.now(),
+ Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
+ scheduledPrgTaskFutureList.add(taskScheduler
+ .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, SCHEDULING_DELAY_FOR_PRH_TASKS));
return true;
} else {
return false;
}
-
}
+
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.exceptions;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
+ */
+public class EnvironmentLoaderException extends Exception {
+
+ public EnvironmentLoaderException(String message) {
+ super(message);
+ }
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.model;
+
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
+ */
+@Value.Immutable(prehash = true)
+public interface EnvProperties {
+
+ @Value.Parameter
+ String consulHost();
+
+ @Value.Parameter
+ Integer consulPort();
+
+ @Value.Parameter
+ String cbsName();
+
+ @Value.Parameter
+ String appName();
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.service;
+
+import org.onap.dcaegen2.services.prh.model.EnvProperties;
+import org.reactivestreams.Publisher;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
+ */
+
+@Service
+public class HttpClientExecutorService {
+
+ public String callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
+ return null;
+ }
+
+ public Publisher<String> callConfigBindingServiceForPrhConfiguration(EnvProperties envProperties,
+ String configBindingServiceUri) {
+
+ return null;
+ }
+
+ private static class HttpGetClient {
+
+ }
+}
package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Config prhAppConfig;
+ private final Config config;
private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
@Autowired
- public AaiProducerTaskImpl(AppConfig prhAppConfig) {
- this.prhAppConfig = prhAppConfig;
+ public AaiProducerTaskImpl(@Qualifier("cloudConfiguration") Config config) {
+ this.config = config;
}
@Override
@Override
protected AaiClientConfiguration resolveConfiguration() {
- return prhAppConfig.getAaiClientConfiguration();
+ return config.getAaiClientConfiguration();
}
@Override
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Config prhAppConfig;
+ private final Config config;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
@Autowired
- public DmaapConsumerTaskImpl(AppConfig prhAppConfig) {
- this.prhAppConfig = prhAppConfig;
+ public DmaapConsumerTaskImpl(@Qualifier("cloudConfiguration") Config config) {
+ this.config = config;
this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
}
DmaapConsumerTaskImpl(AppConfig prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
- this.prhAppConfig = prhAppConfig;
+ this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
}
@Override
void initConfigs() {
- prhAppConfig.initFileStreamReader();
+ config.initFileStreamReader();
}
@Override
protected DmaapConsumerConfiguration resolveConfiguration() {
- return prhAppConfig.getDmaapConsumerConfiguration();
+ return config.getDmaapConsumerConfiguration();
}
@Override
package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Config prhAppConfig;
+ private final Config config;
private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@Autowired
- public DmaapPublisherTaskImpl(AppConfig prhAppConfig) {
- this.prhAppConfig = prhAppConfig;
+ public DmaapPublisherTaskImpl(@Qualifier("cloudConfiguration") Config config) {
+ this.config = config;
}
@Override
@Override
protected DmaapPublisherConfiguration resolveConfiguration() {
- return prhAppConfig.getDmaapPublisherConfiguration();
+ return config.getDmaapPublisherConfiguration();
}
@Override