2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.notification.dao.impl;
23 import com.datastax.driver.core.BatchStatement;
24 import com.datastax.driver.core.ResultSet;
25 import com.datastax.driver.core.Statement;
26 import com.datastax.driver.core.utils.UUIDs;
27 import com.datastax.driver.mapping.Mapper;
28 import com.datastax.driver.mapping.Result;
29 import com.datastax.driver.mapping.annotations.Accessor;
30 import com.datastax.driver.mapping.annotations.Query;
31 import org.apache.commons.collections.CollectionUtils;
32 import org.openecomp.core.dao.impl.CassandraBaseDao;
33 import org.openecomp.core.nosqldb.api.NoSqlDb;
34 import org.openecomp.core.nosqldb.factory.NoSqlDbFactory;
35 import org.openecomp.sdc.notification.dao.NotificationsDao;
36 import org.openecomp.sdc.notification.dao.types.NotificationEntity;
37 import org.openecomp.sdc.notification.dtos.NotificationsStatus;
40 import java.util.stream.Collectors;
42 import static org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory.getSession;
44 //import org.openecomp.sdc.notification.dao.types.LastSeenNotificationEntity;
45 //import java.util.Optional;
47 public class NotificationsDaoCassandraImpl extends CassandraBaseDao<NotificationEntity>
48 implements NotificationsDao {
50 private static final NoSqlDb noSqlDb = NoSqlDbFactory.getInstance().createInterface();
51 private static final Mapper<NotificationEntity> mapper =
52 noSqlDb.getMappingManager().mapper(NotificationEntity.class);
53 private static final NotificationsAccessor accessor =
54 noSqlDb.getMappingManager().createAccessor(NotificationsAccessor.class);
57 protected Mapper<NotificationEntity> getMapper() {
62 protected Object[] getKeys(NotificationEntity entity) {
63 return new Object[]{entity.getOwnerId(), entity.getEventId()};
67 public List<NotificationEntity> list(NotificationEntity entity) {
68 return accessor.list(entity.getOwnerId()).all();
72 public List<NotificationEntity> getNotificationsByOwnerId(String ownerId, int limit) {
73 return accessor.getNotifications(ownerId, limit).all();
77 public List<NotificationEntity> getNewNotificationsByOwnerId(String ownerId, UUID eventId) {
78 return getNewNotificationsByOwnerId(ownerId, eventId,
79 DEFAULT_LIMIT_OF_RESULTS_FOR_OWNER_NOTIFICATIONS);
83 public List<NotificationEntity> getNewNotificationsByOwnerId(String ownerId, UUID eventId, int limit) {
84 if (Objects.isNull(eventId)) {
85 return getNotificationsByOwnerId(ownerId, limit);
87 return accessor.getNewNotifications(ownerId, eventId, limit).all();
91 public void markNotificationAsRead(String ownerId, Collection<UUID> eventIds) {
92 eventIds.forEach(eventId -> accessor.markAsRead(ownerId, eventId));
96 public NotificationsStatus getNotificationsStatus(String ownerId, UUID lastScannedEventId, int numOfRecordsToReturn) {
97 NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
98 List<NotificationEntity> entities = accessor.getNotifications(ownerId, numOfRecordsToReturn).all();
99 if (CollectionUtils.isNotEmpty(entities)) {
100 long lastSeen = UUIDs.unixTimestamp(lastScannedEventId);
101 populateNewNotifications(notificationsStatus, entities, lastSeen);
102 UUID firstScannedEventId = entities.get(0).getEventId();
103 notificationsStatus.setLastScanned(firstScannedEventId);
104 notificationsStatus.setNumOfNotSeenNotifications(accessor.getNewNotificationsCount(ownerId, lastScannedEventId, firstScannedEventId).one().getLong(0));
106 return notificationsStatus;
109 private void populateNewNotifications(NotificationsStatusImpl notificationsStatus, List<NotificationEntity> entities, long lastSeen) {
110 for (NotificationEntity entity : entities) {
111 UUID eventId = entity.getEventId();
112 notificationsStatus.addNotification(entity);
113 if (UUIDs.unixTimestamp(eventId) > lastSeen) {
114 notificationsStatus.addNewNotificationUUID(eventId);
120 public NotificationsStatus getNotificationsStatus(String ownerId, UUID lastSeenNotification, int numOfRecordsToReturn, UUID prevLastScannedEventId) {
121 NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
122 List<NotificationEntity> entities = accessor.getPrevNotifications(ownerId, prevLastScannedEventId, numOfRecordsToReturn).all();
123 if (CollectionUtils.isNotEmpty(entities)) {
124 long lastSeen = UUIDs.unixTimestamp(lastSeenNotification);
125 populateNewNotifications(notificationsStatus, entities, lastSeen);
127 return notificationsStatus;
132 public NotificationsStatus getNotificationsStatus(String ownerId,
133 LastSeenNotificationEntity lastSeenNotification,
134 int numOfRecordsToReturn) {
136 List<NotificationEntity> notificationEntities =
137 fetchNewNotifications(lastSeenNotification, numOfRecordsToReturn);
138 NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
139 if (CollectionUtils.isEmpty(notificationEntities)) {
140 return notificationsStatus;
143 notificationEntities.forEach(notification -> {
144 if (isNewNotification(lastSeenNotification, notification)) {
145 notificationsStatus.addNewNotificationUUID(notification.getEventId());
147 notificationsStatus.addNotification(notification);
150 Optional<NotificationEntity> latestNotification = notificationEntities.stream().findFirst();
151 latestNotification.ifPresent(e -> notificationsStatus.setLastScanned(e.getEventId()));
152 return notificationsStatus;
155 private List<NotificationEntity> fetchNewNotifications(
156 LastSeenNotificationEntity lastSeenNotification, int numOfRecordsToReturn) {
157 String ownerId = lastSeenNotification.getOwnerId();
158 UUID lastEventId = lastSeenNotification.getLastEventId();
159 List<NotificationEntity> newNotificationsByOwnerId =
160 getNewNotificationsByOwnerId(ownerId, lastEventId);
161 newNotificationsByOwnerId = fetchMoreIfNeeded(ownerId, newNotificationsByOwnerId,
162 numOfRecordsToReturn, lastEventId);
163 return newNotificationsByOwnerId;
166 private boolean isNewNotification(LastSeenNotificationEntity lastSeenNotification,
167 NotificationEntity notification) {
168 return Objects.isNull(lastSeenNotification.getLastEventId()) ||
169 UUIDs.unixTimestamp(notification.getEventId()) >
170 UUIDs.unixTimestamp(lastSeenNotification.getLastEventId());
175 public void createBatch(List<NotificationEntity> notificationEntities) {
176 BatchStatement batch = new BatchStatement();
177 List<Statement> statements = notificationEntities.stream()
178 .map(mapper::saveQuery)
179 .collect(Collectors.toList());
180 batch.addAll(statements);
181 getSession().execute(batch);
185 interface NotificationsAccessor {
187 @Query("select * from notifications where owner_id=?")
188 Result<NotificationEntity> list(String ownerId);
190 @Query("select * from notifications where owner_id=? limit ?")
191 Result<NotificationEntity> getNotifications(String ownerId, int limit);
193 @Query("select * from notifications where owner_id=? and event_id > ? limit ?")
194 Result<NotificationEntity> getNewNotifications(String ownerId, UUID lastScannedEventId, int limit);
196 @Query("select * from notifications where owner_id=? and event_id < ? limit ?")
197 Result<NotificationEntity> getPrevNotifications(String ownerId, UUID prevLastScannedEventId, int limit);
199 @Query("select count(*) from notifications where owner_id=? and event_id > ? and event_id <= ?")
200 ResultSet getNewNotificationsCount(String ownerId, UUID lastScannedEventId, UUID firstScannedEventId);
202 @Query("update notifications set read=true where owner_id=? and event_id=?")
203 ResultSet markAsRead(String ownerId, UUID eventId);
206 private class NotificationsStatusImpl implements NotificationsStatus {
208 private List<NotificationEntity> notifications = new ArrayList<>();
209 private List<UUID> newEntries = new ArrayList<>();
210 private UUID lastScanned;
211 private UUID endOfPage;
212 private long numOfNotSeenNotifications = 0;
214 void addNotification(NotificationEntity notification) {
215 notifications.add(notification);
216 endOfPage = notification.getEventId();
219 void addNewNotificationUUID(UUID notificationUuid) {
220 newEntries.add(notificationUuid);
224 public List<NotificationEntity> getNotifications() {
225 return Collections.unmodifiableList(notifications);
229 public List<UUID> getNewEntries() {
230 return Collections.unmodifiableList(newEntries);
234 public UUID getLastScanned() {
238 void setLastScanned(UUID lastScanned) {
239 this.lastScanned = lastScanned;
243 public UUID getEndOfPage() {
248 public long getNumOfNotSeenNotifications() {
249 return numOfNotSeenNotifications;
252 void setNumOfNotSeenNotifications(long numOfNotSeenNotifications) {
253 this.numOfNotSeenNotifications = numOfNotSeenNotifications;
258 private List<NotificationEntity> fetchMoreIfNeeded(String ownerId,
259 List<NotificationEntity> notificationEntities,
260 int numOfRecordsToReturn, UUID lastEventId) {
262 if (numOfRecordsToReturn <= notificationEntities.size() || Objects.isNull(lastEventId)) {
263 return notificationEntities;
267 while (numOfRecordsToReturn > notificationEntities.size()) {
269 int bring = notificationEntities.size() +
270 (numOfRecordsToReturn - notificationEntities.size()) * multiplier;
271 notificationEntities = getNotificationsByOwnerId(ownerId, bring);
273 if (notificationEntities.size() < bring) {
274 return notificationEntities;
278 return notificationEntities;