Include correlationId in group lock.
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / MessagePrioritizationStateService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
23 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
24 import org.onap.ccsdk.cds.controllerblueprints.core.logger
25 import org.springframework.data.domain.PageRequest
26 import org.springframework.stereotype.Service
27 import org.springframework.transaction.annotation.Transactional
28 import java.util.Date
29
30 interface MessagePrioritizationStateService {
31
32     suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
33
34     suspend fun getMessage(id: String): MessagePrioritization
35
36     suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
37
38     suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
39
40     suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
41             List<MessagePrioritization>?
42
43     suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
44             List<MessagePrioritization>?
45
46     suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
47
48     suspend fun getCorrelatedMessages(
49         group: String,
50         states: List<String>,
51         types: List<String>?,
52         correlationIds: String
53     ): List<MessagePrioritization>?
54
55     suspend fun updateMessagesState(ids: List<String>, state: String)
56
57     suspend fun updateMessageState(id: String, state: String): MessagePrioritization
58
59     suspend fun setMessageState(id: String, state: String)
60
61     suspend fun setMessagesPriority(ids: List<String>, priority: String)
62
63     suspend fun setMessagesState(ids: List<String>, state: String)
64
65     suspend fun setMessageStateANdError(id: String, state: String, error: String)
66
67     suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
68
69     suspend fun deleteMessage(id: String)
70
71     suspend fun deleteMessageByGroup(group: String)
72
73     suspend fun deleteMessageStates(group: String, states: List<String>)
74
75     suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
76 }
77
78 @Service
79 open class MessagePrioritizationStateServiceImpl(
80     private val prioritizationMessageRepository: PrioritizationMessageRepository
81 ) :
82     MessagePrioritizationStateService {
83
84     private val log = logger(MessagePrioritizationStateServiceImpl::class)
85
86     @Transactional
87     override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
88         if (!message.correlationId.isNullOrBlank()) {
89             message.correlationId = message.toFormatedCorrelation()
90         }
91         message.updatedDate = Date()
92         return prioritizationMessageRepository.save(message)
93     }
94
95     override suspend fun getMessage(id: String): MessagePrioritization {
96         return prioritizationMessageRepository.findById(id).orElseGet(null)
97             ?: throw BluePrintProcessorException("couldn't find message for id($id)")
98     }
99
100     override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
101         return prioritizationMessageRepository.findAllById(ids)
102     }
103
104     override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
105         return prioritizationMessageRepository
106             .findByStateInAndExpiredDate(
107                 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
108                 Date(), PageRequest.of(0, count)
109             )
110     }
111
112     override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
113             List<MessagePrioritization>? {
114         return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
115             group,
116             states, Date(), PageRequest.of(0, count)
117         )
118     }
119
120     override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
121             List<MessagePrioritization>? {
122         return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
123             group,
124             states, Date(), PageRequest.of(0, count)
125         )
126     }
127
128     override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
129             List<MessagePrioritization>? {
130         return prioritizationMessageRepository.findByByGroupAndExpiredDate(
131             group,
132             expiryDate, PageRequest.of(0, count)
133         )
134     }
135
136     override suspend fun getCorrelatedMessages(
137         group: String,
138         states: List<String>,
139         types: List<String>?,
140         correlationIds: String
141     ): List<MessagePrioritization>? {
142         return if (!types.isNullOrEmpty()) {
143             prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
144         } else {
145             prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
146         }
147     }
148
149     @Transactional
150     override suspend fun updateMessagesState(ids: List<String>, state: String) {
151         ids.forEach {
152             val updated = updateMessageState(it, state)
153             log.info("message($it) update to state(${updated.state})")
154         }
155     }
156
157     @Transactional
158     override suspend fun setMessageState(id: String, state: String) {
159         prioritizationMessageRepository.setStateForMessageId(id, state, Date())
160     }
161
162     @Transactional
163     override suspend fun setMessagesPriority(ids: List<String>, priority: String) {
164         prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date())
165     }
166
167     @Transactional
168     override suspend fun setMessagesState(ids: List<String>, state: String) {
169         prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
170     }
171
172     @Transactional
173     override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
174         prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
175     }
176
177     @Transactional
178     override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
179         val updateMessage = getMessage(id).apply {
180             this.updatedDate = Date()
181             this.state = state
182         }
183         return saveMessage(updateMessage)
184     }
185
186     @Transactional
187     override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
188         val groupedIds = aggregatedIds.joinToString(",")
189         prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
190     }
191
192     override suspend fun deleteMessage(id: String) {
193         return prioritizationMessageRepository.deleteById(id)
194     }
195
196     override suspend fun deleteMessageByGroup(group: String) {
197         return prioritizationMessageRepository.deleteGroup(group)
198     }
199
200     override suspend fun deleteMessageStates(group: String, states: List<String>) {
201         return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
202     }
203
204     override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
205         return prioritizationMessageRepository.deleteGroupAndStateIn(
206             group,
207             arrayListOf(MessageState.EXPIRED.name)
208         )
209     }
210 }