2f08c1c3412fca22313fca27b7a58aca4a56909c
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / MessagePrioritizationSchedulerService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import kotlinx.coroutines.Dispatchers
20 import kotlinx.coroutines.GlobalScope
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.withContext
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.springframework.stereotype.Service
28
29 @Service
30 open class MessagePrioritizationSchedulerService(
31     private val messagePrioritizationService: MessagePrioritizationService
32 ) {
33     private val log = logger(MessagePrioritizationSchedulerService::class)
34
35     @Volatile
36     var keepGoing = true
37
38     /** This is sample scheduler implementation used during starting application with configuration.
39     @EventListener(ApplicationReadyEvent::class)
40     open fun init() = runBlocking {
41         log.info("Starting PrioritizationListeners...")
42         startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
43     }
44     */
45
46     open suspend fun startScheduling() {
47         val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
48
49         log.info("Starting Prioritization Scheduler Service...")
50         GlobalScope.launch {
51             expiryScheduler(prioritizationConfiguration)
52         }
53         GlobalScope.launch {
54             cleanUpScheduler(prioritizationConfiguration)
55         }
56     }
57
58     open suspend fun shutdownScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
59         keepGoing = false
60         delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
61     }
62
63     private suspend fun expiryScheduler(
64         prioritizationConfiguration: PrioritizationConfiguration
65     ) {
66         val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
67         log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
68         withContext(Dispatchers.Default) {
69             while (keepGoing) {
70                 try {
71                     messagePrioritizationService.updateExpiredMessages()
72                     delay(expiryConfiguration.frequencyMilli)
73                 } catch (e: Exception) {
74                     log.error("failed in prioritization expiry scheduler", e)
75                 }
76             }
77         }
78     }
79
80     private suspend fun cleanUpScheduler(
81         prioritizationConfiguration: PrioritizationConfiguration
82     ) {
83         val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
84         log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
85         withContext(Dispatchers.Default) {
86             while (keepGoing) {
87                 try {
88                     messagePrioritizationService.cleanExpiredMessage()
89                     delay(cleanConfiguration.frequencyMilli)
90                 } catch (e: Exception) {
91                     log.error("failed in prioritization clean scheduler", e)
92                 }
93             }
94         }
95     }
96 }