Include correlationId in group lock.
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / service / MessagePrioritizationStateServiceImpl.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
24 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
25 import org.onap.ccsdk.cds.controllerblueprints.core.logger
26 import org.springframework.data.domain.PageRequest
27 import org.springframework.stereotype.Service
28 import org.springframework.transaction.annotation.Transactional
29 import java.util.Date
30
31 @Service
32 open class MessagePrioritizationStateServiceImpl(
33     private val prioritizationMessageRepository: PrioritizationMessageRepository
34 ) : MessagePrioritizationStateService {
35
36     private val log = logger(MessagePrioritizationStateServiceImpl::class)
37
38     @Transactional
39     override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
40         if (!message.correlationId.isNullOrBlank()) {
41             message.correlationId = message.toFormatedCorrelation()
42         }
43         message.updatedDate = Date()
44         return prioritizationMessageRepository.save(message)
45     }
46
47     override suspend fun getMessage(id: String): MessagePrioritization {
48         return prioritizationMessageRepository.findById(id).orElseGet(null)
49             ?: throw BluePrintProcessorException("couldn't find message for id($id)")
50     }
51
52     override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
53         return prioritizationMessageRepository.findAllById(ids)
54     }
55
56     override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
57         return prioritizationMessageRepository
58             .findByStateInAndExpiredDate(
59                 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
60                 Date(), PageRequest.of(0, count)
61             )
62     }
63
64     override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
65         List<MessagePrioritization>? {
66         return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
67             group,
68             states, Date(), PageRequest.of(0, count)
69         )
70     }
71
72     override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
73         List<MessagePrioritization>? {
74         return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
75             group,
76             states, Date(), PageRequest.of(0, count)
77         )
78     }
79
80     override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
81         List<MessagePrioritization>? {
82         return prioritizationMessageRepository.findByByGroupAndExpiredDate(
83             group,
84             expiryDate, PageRequest.of(0, count)
85         )
86     }
87
88     override suspend fun getCorrelatedMessages(
89         group: String,
90         states: List<String>,
91         types: List<String>?,
92         correlationIds: String
93     ): List<MessagePrioritization>? {
94         return if (!types.isNullOrEmpty()) {
95             prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
96         } else {
97             prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
98         }
99     }
100
101     @Transactional
102     override suspend fun updateMessagesState(ids: List<String>, state: String) {
103         ids.forEach {
104             val updated = updateMessageState(it, state)
105             log.info("message($it) update to state(${updated.state})")
106         }
107     }
108
109     @Transactional
110     override suspend fun setMessageState(id: String, state: String) {
111         prioritizationMessageRepository.setStateForMessageId(id, state, Date())
112     }
113
114     @Transactional
115     override suspend fun setMessagesPriority(ids: List<String>, priority: String) {
116         prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date())
117     }
118
119     @Transactional
120     override suspend fun setMessagesState(ids: List<String>, state: String) {
121         prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
122     }
123
124     @Transactional
125     override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
126         prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
127     }
128
129     @Transactional
130     override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
131         val updateMessage = getMessage(id).apply {
132             this.updatedDate = Date()
133             this.state = state
134         }
135         return saveMessage(updateMessage)
136     }
137
138     @Transactional
139     override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
140         val groupedIds = aggregatedIds.joinToString(",")
141         prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
142     }
143
144     override suspend fun deleteMessage(id: String) {
145         return prioritizationMessageRepository.deleteById(id)
146     }
147
148     override suspend fun deleteMessageByGroup(group: String) {
149         return prioritizationMessageRepository.deleteGroup(group)
150     }
151
152     override suspend fun deleteMessageStates(group: String, states: List<String>) {
153         return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
154     }
155
156     override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
157         return prioritizationMessageRepository.deleteGroupAndStateIn(
158             group,
159             arrayListOf(MessageState.EXPIRED.name)
160         )
161     }
162 }