2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
24 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
25 import org.onap.ccsdk.cds.controllerblueprints.core.logger
26 import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
27 import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
28 import org.springframework.data.domain.PageRequest
29 import org.springframework.stereotype.Service
30 import org.springframework.transaction.annotation.Transactional
34 open class MessagePrioritizationStateServiceImpl(
35 private val prioritizationMessageRepository: PrioritizationMessageRepository
36 ) : MessagePrioritizationStateService {
38 private val log = logger(MessagePrioritizationStateServiceImpl::class)
41 override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
42 if (!message.correlationId.isNullOrBlank()) {
43 message.correlationId = message.toFormatedCorrelation()
45 message.updatedDate = Date()
46 return prioritizationMessageRepository.save(message)
49 override suspend fun getMessage(id: String): MessagePrioritization {
50 return prioritizationMessageRepository.findById(id).orElseGet(null)
51 ?: throw BluePrintProcessorException("couldn't find message for id($id)")
54 override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
55 return prioritizationMessageRepository.findAllById(ids)
58 override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
59 return prioritizationMessageRepository
60 .findByStateInAndExpiredDate(
61 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
62 Date(), PageRequest.of(0, count)
66 override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
67 List<MessagePrioritization>? {
68 return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
70 states, Date(), PageRequest.of(0, count)
74 override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
75 List<MessagePrioritization>? {
76 return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
78 states, Date(), PageRequest.of(0, count)
82 override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? {
83 return prioritizationMessageRepository.findByExpiredDate(
84 expiryDate, PageRequest.of(0, count)
88 override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
89 List<MessagePrioritization>? {
90 return prioritizationMessageRepository.findByGroupAndExpiredDate(
92 expiryDate, PageRequest.of(0, count)
96 override suspend fun getCorrelatedMessages(
100 correlationIds: String
101 ): List<MessagePrioritization>? {
102 return if (!types.isNullOrEmpty()) {
103 prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
105 prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
110 override suspend fun updateMessagesState(ids: List<String>, state: String) {
112 val updated = updateMessageState(it, state)
113 log.info("message($it) update to state(${updated.state})")
118 override suspend fun setMessageState(id: String, state: String) {
119 prioritizationMessageRepository.setStateForMessageId(id, state, Date())
123 override suspend fun setMessagesPriority(ids: List<String>, priority: String) {
124 prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date())
128 override suspend fun setMessagesState(ids: List<String>, state: String) {
129 prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
133 override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
134 prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
138 override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
139 val updateMessage = getMessage(id).apply {
140 this.updatedDate = Date()
143 return saveMessage(updateMessage)
147 override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
148 val groupedIds = aggregatedIds.joinToString(",")
149 prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
152 override suspend fun deleteMessage(id: String) {
153 prioritizationMessageRepository.deleteById(id)
154 log.info("Prioritization Messages $id deleted successfully.")
157 override suspend fun deleteMessages(ids: List<String>) {
158 prioritizationMessageRepository.deleteByIds(ids)
159 log.info("Prioritization Messages $ids deleted successfully.")
162 override suspend fun deleteExpiredMessage(retentionDays: Int) {
163 val expiryCheckDate = controllerDate().addDate(retentionDays)
164 prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate)
167 override suspend fun deleteMessageByGroup(group: String) {
168 prioritizationMessageRepository.deleteGroup(group)
169 log.info("Prioritization Messages group($group) deleted successfully.")
172 override suspend fun deleteMessageStates(group: String, states: List<String>) {
173 prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
174 log.info("Prioritization Messages group($group) with states($states) deleted successfully.")