Prioritization expiry and clean scheduler service
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / MessagePrioritizationSchedulerService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import kotlinx.coroutines.Dispatchers
20 import kotlinx.coroutines.GlobalScope
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.withContext
24 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
25 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.springframework.stereotype.Service
28
29 @Service
30 open class MessagePrioritizationSchedulerService(
31     private val messagePrioritizationService: MessagePrioritizationService
32 ) {
33     private val log = logger(MessagePrioritizationSchedulerService::class)
34
35     @Volatile
36     var keepGoing = true
37
38     /** This is sample scheduler implementation used during starting application with configuration.
39     @EventListener(ApplicationReadyEvent::class)
40     open fun init() = runBlocking {
41         log.info("Starting PrioritizationListeners...")
42         startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
43     }
44     */
45
46     open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
47         log.info("Starting Prioritization Scheduler Service...")
48         GlobalScope.launch {
49             expiryScheduler(prioritizationConfiguration)
50         }
51         GlobalScope.launch {
52             cleanUpScheduler(prioritizationConfiguration)
53         }
54     }
55
56     open suspend fun shutdownScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
57         keepGoing = false
58         delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
59     }
60
61     private suspend fun expiryScheduler(
62         prioritizationConfiguration: PrioritizationConfiguration
63     ) {
64         val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
65         log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
66         withContext(Dispatchers.Default) {
67             while (keepGoing) {
68                 try {
69                     messagePrioritizationService.updateExpiredMessages(expiryConfiguration)
70                     delay(expiryConfiguration.frequencyMilli)
71                 } catch (e: Exception) {
72                     log.error("failed in prioritization expiry scheduler", e)
73                 }
74             }
75         }
76     }
77
78     private suspend fun cleanUpScheduler(
79         prioritizationConfiguration: PrioritizationConfiguration
80     ) {
81         val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
82         log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
83         withContext(Dispatchers.Default) {
84             while (keepGoing) {
85                 try {
86                     messagePrioritizationService.cleanExpiredMessage(cleanConfiguration)
87                     delay(cleanConfiguration.frequencyMilli)
88                 } catch (e: Exception) {
89                     log.error("failed in prioritization clean scheduler", e)
90                 }
91             }
92         }
93     }
94 }