e4369fc20ef41584b623cacc4e64bfbd5f81bcf2
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
18
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
23 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
24 import org.onap.ccsdk.cds.controllerblueprints.core.logger
25 import org.springframework.data.domain.PageRequest
26 import org.springframework.stereotype.Service
27 import org.springframework.transaction.annotation.Transactional
28 import java.util.*
29
30 interface MessagePrioritizationStateService {
31
32     suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
33
34     suspend fun getMessage(id: String): MessagePrioritization
35
36     suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
37
38     suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
39
40     suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
41
42     suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
43
44     suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, correlationIds: String): List<MessagePrioritization>?
45
46     suspend fun updateMessagesState(ids: List<String>, state: String)
47
48     suspend fun updateMessageState(id: String, state: String): MessagePrioritization
49
50     suspend fun setMessageState(id: String, state: String)
51
52     suspend fun setMessagesState(ids: List<String>, state: String)
53
54     suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List<String>): MessagePrioritization
55
56     suspend fun deleteMessage(id: String)
57
58     suspend fun deleteMessageByGroup(group: String)
59
60     suspend fun deleteMessageStates(group: String, states: List<String>)
61
62     suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
63 }
64
65 @Service
66 open class MessagePrioritizationStateServiceImpl(
67         private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService {
68
69     private val log = logger(MessagePrioritizationStateServiceImpl::class)
70
71     @Transactional
72     override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
73         if (!message.correlationId.isNullOrBlank()) {
74             message.correlationId = message.toFormatedCorrelation()
75         }
76         message.updatedDate = Date()
77         return prioritizationMessageRepository.save(message)
78     }
79
80     override suspend fun getMessage(id: String): MessagePrioritization {
81         return prioritizationMessageRepository.findById(id).orElseGet(null)
82                 ?: throw BluePrintProcessorException("couldn't find message for id($id)")
83     }
84
85     override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
86         return prioritizationMessageRepository
87                 .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
88                         Date(), PageRequest.of(0, count))
89     }
90
91     override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
92             : List<MessagePrioritization>? {
93         return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group,
94                 states, Date(), PageRequest.of(0, count))
95     }
96
97     override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
98             : List<MessagePrioritization>? {
99         return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group,
100                 states, Date(), PageRequest.of(0, count))
101     }
102
103     override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int)
104             : List<MessagePrioritization>? {
105         return prioritizationMessageRepository.findByByGroupAndExpiredDate(group,
106                 expiryDate, PageRequest.of(0, count))
107     }
108
109     override suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
110                                                correlationIds: String): List<MessagePrioritization>? {
111         return if (!types.isNullOrEmpty()) {
112             prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
113         } else {
114             prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
115         }
116     }
117
118     override suspend fun updateMessagesState(ids: List<String>, state: String) {
119         ids.forEach {
120             val updated = updateMessageState(it, state)
121             log.info("message($it) update to state(${updated.state})")
122         }
123     }
124
125     @Transactional
126     override suspend fun setMessageState(id: String, state: String) {
127         prioritizationMessageRepository.setStatusForMessageId(id, state)
128     }
129
130     @Transactional
131     override suspend fun setMessagesState(ids: List<String>, state: String) {
132         prioritizationMessageRepository.setStatusForMessageIds(ids, state)
133     }
134
135     @Transactional
136     override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
137         val updateMessage = getMessage(id).apply {
138             this.updatedDate = Date()
139             this.state = state
140         }
141         return saveMessage(updateMessage)
142     }
143
144     override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List<String>)
145             : MessagePrioritization {
146
147         val groupedIds = groupedMessageIds.joinToString(",")
148         val updateMessage = getMessage(id).apply {
149             this.updatedDate = Date()
150             this.state = state
151             this.aggregatedMessageIds = groupedIds
152         }
153         return saveMessage(updateMessage)
154     }
155
156     override suspend fun deleteMessage(id: String) {
157         return prioritizationMessageRepository.deleteById(id)
158     }
159
160     override suspend fun deleteMessageByGroup(group: String) {
161         return prioritizationMessageRepository.deleteGroup(group)
162     }
163
164     override suspend fun deleteMessageStates(group: String, states: List<String>) {
165         return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
166     }
167
168     override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
169         return prioritizationMessageRepository.deleteGroupAndStateIn(group,
170                 arrayListOf(MessageState.EXPIRED.name))
171     }
172 }