2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import kotlinx.coroutines.Dispatchers
20 import kotlinx.coroutines.GlobalScope
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.withContext
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.springframework.stereotype.Service
30 open class MessagePrioritizationSchedulerService(
31 private val messagePrioritizationService: MessagePrioritizationService
33 private val log = logger(MessagePrioritizationSchedulerService::class)
38 /** This is sample scheduler implementation used during starting application with configuration.
39 @EventListener(ApplicationReadyEvent::class)
40 open fun init() = runBlocking {
41 log.info("Starting PrioritizationListeners...")
42 startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
46 open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
47 log.info("Starting Prioritization Scheduler Service...")
49 expiryScheduler(prioritizationConfiguration)
52 cleanUpScheduler(prioritizationConfiguration)
56 open suspend fun shutdownScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
58 delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
61 private suspend fun expiryScheduler(
62 prioritizationConfiguration: PrioritizationConfiguration
64 val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
65 log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
66 withContext(Dispatchers.Default) {
69 messagePrioritizationService.updateExpiredMessages(expiryConfiguration)
70 delay(expiryConfiguration.frequencyMilli)
71 } catch (e: Exception) {
72 log.error("failed in prioritization expiry scheduler", e)
78 private suspend fun cleanUpScheduler(
79 prioritizationConfiguration: PrioritizationConfiguration
81 val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
82 log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
83 withContext(Dispatchers.Default) {
86 messagePrioritizationService.cleanExpiredMessage(cleanConfiguration)
87 delay(cleanConfiguration.frequencyMilli)
88 } catch (e: Exception) {
89 log.error("failed in prioritization clean scheduler", e)