2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
19 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
20 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
24 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
25 import org.onap.ccsdk.cds.controllerblueprints.core.logger
26 import org.springframework.data.domain.PageRequest
27 import org.springframework.stereotype.Service
28 import org.springframework.transaction.annotation.Transactional
32 open class MessagePrioritizationStateServiceImpl(
33 private val prioritizationMessageRepository: PrioritizationMessageRepository
34 ) : MessagePrioritizationStateService {
36 private val log = logger(MessagePrioritizationStateServiceImpl::class)
39 override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
40 if (!message.correlationId.isNullOrBlank()) {
41 message.correlationId = message.toFormatedCorrelation()
43 message.updatedDate = Date()
44 return prioritizationMessageRepository.save(message)
47 override suspend fun getMessage(id: String): MessagePrioritization {
48 return prioritizationMessageRepository.findById(id).orElseGet(null)
49 ?: throw BluePrintProcessorException("couldn't find message for id($id)")
52 override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
53 return prioritizationMessageRepository.findAllById(ids)
56 override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
57 return prioritizationMessageRepository
58 .findByStateInAndExpiredDate(
59 arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
60 Date(), PageRequest.of(0, count)
64 override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
65 List<MessagePrioritization>? {
66 return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
68 states, Date(), PageRequest.of(0, count)
72 override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
73 List<MessagePrioritization>? {
74 return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
76 states, Date(), PageRequest.of(0, count)
80 override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
81 List<MessagePrioritization>? {
82 return prioritizationMessageRepository.findByByGroupAndExpiredDate(
84 expiryDate, PageRequest.of(0, count)
88 override suspend fun getCorrelatedMessages(
92 correlationIds: String
93 ): List<MessagePrioritization>? {
94 return if (!types.isNullOrEmpty()) {
95 prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
97 prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
102 override suspend fun updateMessagesState(ids: List<String>, state: String) {
104 val updated = updateMessageState(it, state)
105 log.info("message($it) update to state(${updated.state})")
110 override suspend fun setMessageState(id: String, state: String) {
111 prioritizationMessageRepository.setStateForMessageId(id, state, Date())
115 override suspend fun setMessagesPriority(ids: List<String>, priority: String) {
116 prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date())
120 override suspend fun setMessagesState(ids: List<String>, state: String) {
121 prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
125 override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
126 prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
130 override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
131 val updateMessage = getMessage(id).apply {
132 this.updatedDate = Date()
135 return saveMessage(updateMessage)
139 override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
140 val groupedIds = aggregatedIds.joinToString(",")
141 prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
144 override suspend fun deleteMessage(id: String) {
145 return prioritizationMessageRepository.deleteById(id)
148 override suspend fun deleteMessageByGroup(group: String) {
149 return prioritizationMessageRepository.deleteGroup(group)
152 override suspend fun deleteMessageStates(group: String, states: List<String>) {
153 return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
156 override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
157 return prioritizationMessageRepository.deleteGroupAndStateIn(
159 arrayListOf(MessageState.EXPIRED.name)