2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import kotlinx.coroutines.Dispatchers
20 import kotlinx.coroutines.GlobalScope
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.withContext
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.springframework.stereotype.Service
30 open class MessagePrioritizationSchedulerService(
31 private val messagePrioritizationService: MessagePrioritizationService
33 private val log = logger(MessagePrioritizationSchedulerService::class)
38 /** This is sample scheduler implementation used during starting application with configuration.
39 @EventListener(ApplicationReadyEvent::class)
40 open fun init() = runBlocking {
41 log.info("Starting PrioritizationListeners...")
42 startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
46 open suspend fun startScheduling() {
47 val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
49 log.info("Starting Prioritization Scheduler Service...")
51 expiryScheduler(prioritizationConfiguration)
54 cleanUpScheduler(prioritizationConfiguration)
58 open suspend fun shutdownScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
60 delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
63 private suspend fun expiryScheduler(
64 prioritizationConfiguration: PrioritizationConfiguration
66 val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
67 log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
68 withContext(Dispatchers.Default) {
71 messagePrioritizationService.updateExpiredMessages()
72 delay(expiryConfiguration.frequencyMilli)
73 } catch (e: Exception) {
74 log.error("failed in prioritization expiry scheduler", e)
80 private suspend fun cleanUpScheduler(
81 prioritizationConfiguration: PrioritizationConfiguration
83 val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
84 log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
85 withContext(Dispatchers.Default) {
88 messagePrioritizationService.cleanExpiredMessage()
89 delay(cleanConfiguration.frequencyMilli)
90 } catch (e: Exception) {
91 log.error("failed in prioritization clean scheduler", e)