6138fa9d3907ce0110fdda2cc44892527e1d613d
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
23 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
24 import org.onap.ccsdk.cds.controllerblueprints.core.logger
25 import org.springframework.data.domain.PageRequest
26 import org.springframework.stereotype.Service
27 import org.springframework.transaction.annotation.Transactional
28 import java.util.*
29
30 interface MessagePrioritizationStateService {
31
32     suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
33
34     suspend fun getMessage(id: String): MessagePrioritization
35
36     suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
37
38     suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
39
40     suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
41             : List<MessagePrioritization>?
42
43     suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
44             : List<MessagePrioritization>?
45
46     suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
47
48     suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
49                                       correlationIds: String): List<MessagePrioritization>?
50
51     suspend fun updateMessagesState(ids: List<String>, state: String)
52
53     suspend fun updateMessageState(id: String, state: String): MessagePrioritization
54
55     suspend fun setMessageState(id: String, state: String)
56
57     suspend fun setMessagesPriority(ids: List<String>, priority: String)
58
59     suspend fun setMessagesState(ids: List<String>, state: String)
60
61     suspend fun setMessageStateANdError(id: String, state: String, error: String)
62
63     suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
64
65     suspend fun deleteMessage(id: String)
66
67     suspend fun deleteMessageByGroup(group: String)
68
69     suspend fun deleteMessageStates(group: String, states: List<String>)
70
71     suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
72 }
73
74 @Service
75 open class MessagePrioritizationStateServiceImpl(
76         private val prioritizationMessageRepository: PrioritizationMessageRepository)
77     : MessagePrioritizationStateService {
78
79     private val log = logger(MessagePrioritizationStateServiceImpl::class)
80
81     @Transactional
82     override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
83         if (!message.correlationId.isNullOrBlank()) {
84             message.correlationId = message.toFormatedCorrelation()
85         }
86         message.updatedDate = Date()
87         return prioritizationMessageRepository.save(message)
88     }
89
90     override suspend fun getMessage(id: String): MessagePrioritization {
91         return prioritizationMessageRepository.findById(id).orElseGet(null)
92                 ?: throw BluePrintProcessorException("couldn't find message for id($id)")
93     }
94
95     override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
96         return prioritizationMessageRepository.findAllById(ids)
97     }
98
99     override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
100         return prioritizationMessageRepository
101                 .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
102                         Date(), PageRequest.of(0, count))
103     }
104
105     override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
106             : List<MessagePrioritization>? {
107         return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group,
108                 states, Date(), PageRequest.of(0, count))
109     }
110
111     override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
112             : List<MessagePrioritization>? {
113         return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group,
114                 states, Date(), PageRequest.of(0, count))
115     }
116
117     override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int)
118             : List<MessagePrioritization>? {
119         return prioritizationMessageRepository.findByByGroupAndExpiredDate(group,
120                 expiryDate, PageRequest.of(0, count))
121     }
122
123     override suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
124                                                correlationIds: String): List<MessagePrioritization>? {
125         return if (!types.isNullOrEmpty()) {
126             prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
127         } else {
128             prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
129         }
130     }
131
132     @Transactional
133     override suspend fun updateMessagesState(ids: List<String>, state: String) {
134         ids.forEach {
135             val updated = updateMessageState(it, state)
136             log.info("message($it) update to state(${updated.state})")
137         }
138     }
139
140     @Transactional
141     override suspend fun setMessageState(id: String, state: String) {
142         prioritizationMessageRepository.setStateForMessageId(id, state, Date())
143     }
144
145     @Transactional
146     override suspend fun setMessagesPriority(ids: List<String>, priority: String) {
147         prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date())
148     }
149
150     @Transactional
151     override suspend fun setMessagesState(ids: List<String>, state: String) {
152         prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
153     }
154
155     @Transactional
156     override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
157         prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
158     }
159
160     @Transactional
161     override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
162         val updateMessage = getMessage(id).apply {
163             this.updatedDate = Date()
164             this.state = state
165         }
166         return saveMessage(updateMessage)
167     }
168
169     @Transactional
170     override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
171         val groupedIds = aggregatedIds.joinToString(",")
172         prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
173     }
174
175     override suspend fun deleteMessage(id: String) {
176         return prioritizationMessageRepository.deleteById(id)
177     }
178
179     override suspend fun deleteMessageByGroup(group: String) {
180         return prioritizationMessageRepository.deleteGroup(group)
181     }
182
183     override suspend fun deleteMessageStates(group: String, states: List<String>) {
184         return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
185     }
186
187     override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
188         return prioritizationMessageRepository.deleteGroupAndStateIn(group,
189                 arrayListOf(MessageState.EXPIRED.name))
190     }
191 }