2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.notification.dao.impl;
23 import com.datastax.driver.core.BatchStatement;
24 import com.datastax.driver.core.ResultSet;
25 import com.datastax.driver.core.Statement;
26 import com.datastax.driver.core.utils.UUIDs;
27 import com.datastax.driver.mapping.Mapper;
28 import com.datastax.driver.mapping.Result;
29 import com.datastax.driver.mapping.annotations.Accessor;
30 import com.datastax.driver.mapping.annotations.Query;
31 import org.apache.commons.collections.CollectionUtils;
32 import org.openecomp.core.dao.impl.CassandraBaseDao;
33 import org.openecomp.core.nosqldb.api.NoSqlDb;
34 import org.openecomp.core.nosqldb.factory.NoSqlDbFactory;
35 import org.openecomp.sdc.notification.dao.NotificationsDao;
36 import org.openecomp.sdc.notification.dao.types.NotificationEntity;
37 import org.openecomp.sdc.notification.dtos.NotificationsStatus;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Objects;
44 import java.util.UUID;
45 import java.util.stream.Collectors;
47 import static org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory.getSession;
49 //import org.openecomp.sdc.notification.dao.types.LastSeenNotificationEntity;
50 //import java.util.Optional;
52 public class NotificationsDaoCassandraImpl extends CassandraBaseDao<NotificationEntity>
53 implements NotificationsDao {
55 private static final NoSqlDb noSqlDb = NoSqlDbFactory.getInstance().createInterface();
56 private static final Mapper<NotificationEntity> mapper =
57 noSqlDb.getMappingManager().mapper(NotificationEntity.class);
58 private static final NotificationsAccessor accessor =
59 noSqlDb.getMappingManager().createAccessor(NotificationsAccessor.class);
62 protected Mapper<NotificationEntity> getMapper() {
67 protected Object[] getKeys(NotificationEntity entity) {
68 return new Object[]{entity.getOwnerId(), entity.getEventId()};
72 public List<NotificationEntity> list(NotificationEntity entity) {
73 return accessor.list(entity.getOwnerId()).all();
77 public List<NotificationEntity> getNotificationsByOwnerId(String ownerId, int limit) {
78 return accessor.getNotifications(ownerId, limit).all();
82 public List<NotificationEntity> getNewNotificationsByOwnerId(String ownerId, UUID eventId) {
83 return getNewNotificationsByOwnerId(ownerId, eventId,
84 DEFAULT_LIMIT_OF_RESULTS_FOR_OWNER_NOTIFICATIONS);
88 public List<NotificationEntity> getNewNotificationsByOwnerId(String ownerId, UUID eventId, int limit) {
89 if (Objects.isNull(eventId)) {
90 return getNotificationsByOwnerId(ownerId, limit);
92 return accessor.getNewNotifications(ownerId, eventId, limit).all();
96 public void markNotificationAsRead(String ownerId, Collection<UUID> eventIds) {
97 eventIds.forEach(eventId -> accessor.markAsRead(ownerId, eventId));
101 public NotificationsStatus getNotificationsStatus(String ownerId, UUID lastScannedEventId, int numOfRecordsToReturn) {
102 NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
103 List<NotificationEntity> entities = accessor.getNotifications(ownerId, numOfRecordsToReturn).all();
104 if (CollectionUtils.isNotEmpty(entities)) {
105 long lastSeen = UUIDs.unixTimestamp(lastScannedEventId);
106 populateNewNotifications(notificationsStatus, entities, lastSeen);
107 UUID firstScannedEventId = entities.get(0).getEventId();
108 notificationsStatus.setLastScanned(firstScannedEventId);
109 notificationsStatus.setNumOfNotSeenNotifications(accessor.getNewNotificationsCount(ownerId, lastScannedEventId, firstScannedEventId).one().getLong(0));
111 return notificationsStatus;
114 private void populateNewNotifications(NotificationsStatusImpl notificationsStatus, List<NotificationEntity> entities, long lastSeen) {
115 for (NotificationEntity entity : entities) {
116 UUID eventId = entity.getEventId();
117 notificationsStatus.addNotification(entity);
118 if (UUIDs.unixTimestamp(eventId) > lastSeen) {
119 notificationsStatus.addNewNotificationUUID(eventId);
125 public NotificationsStatus getNotificationsStatus(String ownerId, UUID lastSeenNotification, int numOfRecordsToReturn, UUID prevLastScannedEventId) {
126 NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
127 List<NotificationEntity> entities = accessor.getPrevNotifications(ownerId, prevLastScannedEventId, numOfRecordsToReturn).all();
128 if (CollectionUtils.isNotEmpty(entities)) {
129 long lastSeen = UUIDs.unixTimestamp(lastSeenNotification);
130 populateNewNotifications(notificationsStatus, entities, lastSeen);
132 return notificationsStatus;
137 public NotificationsStatus getNotificationsStatus(String ownerId,
138 LastSeenNotificationEntity lastSeenNotification,
139 int numOfRecordsToReturn) {
141 List<NotificationEntity> notificationEntities =
142 fetchNewNotifications(lastSeenNotification, numOfRecordsToReturn);
143 NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
144 if (CollectionUtils.isEmpty(notificationEntities)) {
145 return notificationsStatus;
148 notificationEntities.forEach(notification -> {
149 if (isNewNotification(lastSeenNotification, notification)) {
150 notificationsStatus.addNewNotificationUUID(notification.getEventId());
152 notificationsStatus.addNotification(notification);
155 Optional<NotificationEntity> latestNotification = notificationEntities.stream().findFirst();
156 latestNotification.ifPresent(e -> notificationsStatus.setLastScanned(e.getEventId()));
157 return notificationsStatus;
160 private List<NotificationEntity> fetchNewNotifications(
161 LastSeenNotificationEntity lastSeenNotification, int numOfRecordsToReturn) {
162 String ownerId = lastSeenNotification.getOwnerId();
163 UUID lastEventId = lastSeenNotification.getLastEventId();
164 List<NotificationEntity> newNotificationsByOwnerId =
165 getNewNotificationsByOwnerId(ownerId, lastEventId);
166 newNotificationsByOwnerId = fetchMoreIfNeeded(ownerId, newNotificationsByOwnerId,
167 numOfRecordsToReturn, lastEventId);
168 return newNotificationsByOwnerId;
171 private boolean isNewNotification(LastSeenNotificationEntity lastSeenNotification,
172 NotificationEntity notification) {
173 return Objects.isNull(lastSeenNotification.getLastEventId()) ||
174 UUIDs.unixTimestamp(notification.getEventId()) >
175 UUIDs.unixTimestamp(lastSeenNotification.getLastEventId());
180 public void createBatch(List<NotificationEntity> notificationEntities) {
181 BatchStatement batch = new BatchStatement();
182 List<Statement> statements = notificationEntities.stream()
183 .map(mapper::saveQuery)
184 .collect(Collectors.toList());
185 batch.addAll(statements);
186 getSession().execute(batch);
190 interface NotificationsAccessor {
192 @Query("select * from notifications where owner_id=?")
193 Result<NotificationEntity> list(String ownerId);
195 @Query("select * from notifications where owner_id=? limit ?")
196 Result<NotificationEntity> getNotifications(String ownerId, int limit);
198 @Query("select * from notifications where owner_id=? and event_id > ? limit ?")
199 Result<NotificationEntity> getNewNotifications(String ownerId, UUID lastScannedEventId, int limit);
201 @Query("select * from notifications where owner_id=? and event_id < ? limit ?")
202 Result<NotificationEntity> getPrevNotifications(String ownerId, UUID prevLastScannedEventId, int limit);
204 @Query("select count(*) from notifications where owner_id=? and event_id > ? and event_id <= ?")
205 ResultSet getNewNotificationsCount(String ownerId, UUID lastScannedEventId, UUID firstScannedEventId);
207 @Query("update notifications set read=true where owner_id=? and event_id=?")
208 ResultSet markAsRead(String ownerId, UUID eventId);
211 private class NotificationsStatusImpl implements NotificationsStatus {
213 private List<NotificationEntity> notifications = new ArrayList<>();
214 private List<UUID> newEntries = new ArrayList<>();
215 private UUID lastScanned;
216 private UUID endOfPage;
217 private long numOfNotSeenNotifications = 0;
219 void addNotification(NotificationEntity notification) {
220 notifications.add(notification);
221 endOfPage = notification.getEventId();
224 void addNewNotificationUUID(UUID notificationUuid) {
225 newEntries.add(notificationUuid);
229 public List<NotificationEntity> getNotifications() {
230 return Collections.unmodifiableList(notifications);
234 public List<UUID> getNewEntries() {
235 return Collections.unmodifiableList(newEntries);
239 public UUID getLastScanned() {
243 void setLastScanned(UUID lastScanned) {
244 this.lastScanned = lastScanned;
248 public UUID getEndOfPage() {
253 public long getNumOfNotSeenNotifications() {
254 return numOfNotSeenNotifications;
257 void setNumOfNotSeenNotifications(long numOfNotSeenNotifications) {
258 this.numOfNotSeenNotifications = numOfNotSeenNotifications;
263 private List<NotificationEntity> fetchMoreIfNeeded(String ownerId,
264 List<NotificationEntity> notificationEntities,
265 int numOfRecordsToReturn, UUID lastEventId) {
267 if (numOfRecordsToReturn <= notificationEntities.size() || Objects.isNull(lastEventId)) {
268 return notificationEntities;
272 while (numOfRecordsToReturn > notificationEntities.size()) {
274 int bring = notificationEntities.size() +
275 (numOfRecordsToReturn - notificationEntities.size()) * multiplier;
276 notificationEntities = getNotificationsByOwnerId(ownerId, bring);
278 if (notificationEntities.size() < bring) {
279 return notificationEntities;
283 return notificationEntities;