2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import kotlinx.coroutines.Dispatchers
20 import kotlinx.coroutines.GlobalScope
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.withContext
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.springframework.stereotype.Service
30 open class MessagePrioritizationSchedulerService(
31 private val messagePrioritizationService: MessagePrioritizationService
34 private val log = logger(MessagePrioritizationSchedulerService::class)
39 /** This is sample scheduler implementation used during starting application with configuration.
40 @EventListener(ApplicationReadyEvent::class)
41 open fun init() = runBlocking {
42 log.info("Starting PrioritizationListeners...")
43 startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
47 open suspend fun startScheduling() {
48 val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
50 log.info("Starting Prioritization Scheduler Service...")
52 expiryScheduler(prioritizationConfiguration)
55 cleanUpScheduler(prioritizationConfiguration)
59 open suspend fun shutdownScheduling() {
61 val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
62 delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
65 private suspend fun expiryScheduler(
66 prioritizationConfiguration: PrioritizationConfiguration
68 val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
69 log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
70 withContext(Dispatchers.Default) {
73 messagePrioritizationService.updateExpiredMessages()
74 delay(expiryConfiguration.frequencyMilli)
75 } catch (e: Exception) {
76 log.error("failed in prioritization expiry scheduler", e)
82 private suspend fun cleanUpScheduler(
83 prioritizationConfiguration: PrioritizationConfiguration
85 val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
86 log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
87 withContext(Dispatchers.Default) {
90 messagePrioritizationService.cleanExpiredMessage()
91 delay(cleanConfiguration.frequencyMilli)
92 } catch (e: Exception) {
93 log.error("failed in prioritization clean scheduler", e)