2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import kotlinx.coroutines.Dispatchers
20 import kotlinx.coroutines.GlobalScope
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.withContext
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.springframework.stereotype.Service
30 open class MessagePrioritizationSchedulerService(
31 private val messagePrioritizationService: MessagePrioritizationService
33 private val log = logger(MessagePrioritizationSchedulerService::class)
38 /** This is sample scheduler implementation used during starting application with configuration.
39 @EventListener(ApplicationReadyEvent::class)
40 open fun init() = runBlocking {
41 log.info("Starting PrioritizationListeners...")
42 startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
46 open suspend fun startScheduling() {
47 val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
49 log.info("Starting Prioritization Scheduler Service...")
51 expiryScheduler(prioritizationConfiguration)
54 cleanUpScheduler(prioritizationConfiguration)
58 open suspend fun shutdownScheduling() {
60 val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
61 delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
64 private suspend fun expiryScheduler(
65 prioritizationConfiguration: PrioritizationConfiguration
67 val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
68 log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
69 withContext(Dispatchers.Default) {
72 messagePrioritizationService.updateExpiredMessages()
73 delay(expiryConfiguration.frequencyMilli)
74 } catch (e: Exception) {
75 log.error("failed in prioritization expiry scheduler", e)
81 private suspend fun cleanUpScheduler(
82 prioritizationConfiguration: PrioritizationConfiguration
84 val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
85 log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
86 withContext(Dispatchers.Default) {
89 messagePrioritizationService.cleanExpiredMessage()
90 delay(cleanConfiguration.frequencyMilli)
91 } catch (e: Exception) {
92 log.error("failed in prioritization clean scheduler", e)