2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
23 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
24 import org.onap.ccsdk.cds.controllerblueprints.core.logger
25 import org.springframework.data.domain.PageRequest
26 import org.springframework.stereotype.Service
27 import org.springframework.transaction.annotation.Transactional
30 interface MessagePrioritizationStateService {
32 suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
34 suspend fun getMessage(id: String): MessagePrioritization
36 suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
38 suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
40 suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
42 suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
44 suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, correlationIds: String): List<MessagePrioritization>?
46 suspend fun updateMessagesState(ids: List<String>, state: String)
48 suspend fun updateMessageState(id: String, state: String): MessagePrioritization
50 suspend fun setMessageState(id: String, state: String)
52 suspend fun setMessagesState(ids: List<String>, state: String)
54 suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List<String>): MessagePrioritization
56 suspend fun deleteMessage(id: String)
58 suspend fun deleteMessageByGroup(group: String)
60 suspend fun deleteMessageStates(group: String, states: List<String>)
62 suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
66 open class MessagePrioritizationStateServiceImpl(
67 private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService {
69 private val log = logger(MessagePrioritizationStateServiceImpl::class)
72 override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
73 if (!message.correlationId.isNullOrBlank()) {
74 message.correlationId = message.toFormatedCorrelation()
76 message.updatedDate = Date()
77 return prioritizationMessageRepository.save(message)
80 override suspend fun getMessage(id: String): MessagePrioritization {
81 return prioritizationMessageRepository.findById(id).orElseGet(null)
82 ?: throw BluePrintProcessorException("couldn't find message for id($id)")
85 override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
86 return prioritizationMessageRepository
87 .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
88 Date(), PageRequest.of(0, count))
91 override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
92 : List<MessagePrioritization>? {
93 return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group,
94 states, Date(), PageRequest.of(0, count))
97 override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
98 : List<MessagePrioritization>? {
99 return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group,
100 states, Date(), PageRequest.of(0, count))
103 override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int)
104 : List<MessagePrioritization>? {
105 return prioritizationMessageRepository.findByByGroupAndExpiredDate(group,
106 expiryDate, PageRequest.of(0, count))
109 override suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
110 correlationIds: String): List<MessagePrioritization>? {
111 return if (!types.isNullOrEmpty()) {
112 prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
114 prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
118 override suspend fun updateMessagesState(ids: List<String>, state: String) {
120 val updated = updateMessageState(it, state)
121 log.info("message($it) update to state(${updated.state})")
126 override suspend fun setMessageState(id: String, state: String) {
127 prioritizationMessageRepository.setStatusForMessageId(id, state)
131 override suspend fun setMessagesState(ids: List<String>, state: String) {
132 prioritizationMessageRepository.setStatusForMessageIds(ids, state)
136 override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
137 val updateMessage = getMessage(id).apply {
138 this.updatedDate = Date()
141 return saveMessage(updateMessage)
144 override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List<String>)
145 : MessagePrioritization {
147 val groupedIds = groupedMessageIds.joinToString(",")
148 val updateMessage = getMessage(id).apply {
149 this.updatedDate = Date()
151 this.aggregatedMessageIds = groupedIds
153 return saveMessage(updateMessage)
156 override suspend fun deleteMessage(id: String) {
157 return prioritizationMessageRepository.deleteById(id)
160 override suspend fun deleteMessageByGroup(group: String) {
161 return prioritizationMessageRepository.deleteGroup(group)
164 override suspend fun deleteMessageStates(group: String, states: List<String>) {
165 return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
168 override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
169 return prioritizationMessageRepository.deleteGroupAndStateIn(group,
170 arrayListOf(MessageState.EXPIRED.name))