Add collaboration feature
[sdc.git] / openecomp-be / lib / openecomp-sdc-notification-lib / openecomp-sdc-notification-core / src / main / java / org / openecomp / sdc / notification / dao / impl / NotificationsDaoCassandraImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.notification.dao.impl;
22
23 import com.datastax.driver.core.BatchStatement;
24 import com.datastax.driver.core.ResultSet;
25 import com.datastax.driver.core.Statement;
26 import com.datastax.driver.core.utils.UUIDs;
27 import com.datastax.driver.mapping.Mapper;
28 import com.datastax.driver.mapping.Result;
29 import com.datastax.driver.mapping.annotations.Accessor;
30 import com.datastax.driver.mapping.annotations.Query;
31 import org.apache.commons.collections.CollectionUtils;
32 import org.openecomp.core.dao.impl.CassandraBaseDao;
33 import org.openecomp.core.nosqldb.api.NoSqlDb;
34 import org.openecomp.core.nosqldb.factory.NoSqlDbFactory;
35 import org.openecomp.sdc.notification.dao.NotificationsDao;
36 import org.openecomp.sdc.notification.dao.types.NotificationEntity;
37 import org.openecomp.sdc.notification.dtos.NotificationsStatus;
38
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Objects;
44 import java.util.UUID;
45 import java.util.stream.Collectors;
46
47 import static org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory.getSession;
48
49 //import org.openecomp.sdc.notification.dao.types.LastSeenNotificationEntity;
50 //import java.util.Optional;
51
52 public class NotificationsDaoCassandraImpl extends CassandraBaseDao<NotificationEntity>
53     implements NotificationsDao {
54
55     private static final NoSqlDb noSqlDb = NoSqlDbFactory.getInstance().createInterface();
56     private static final Mapper<NotificationEntity> mapper =
57         noSqlDb.getMappingManager().mapper(NotificationEntity.class);
58     private static final NotificationsAccessor accessor =
59         noSqlDb.getMappingManager().createAccessor(NotificationsAccessor.class);
60
61     @Override
62     protected Mapper<NotificationEntity> getMapper() {
63         return mapper;
64     }
65
66     @Override
67     protected Object[] getKeys(NotificationEntity entity) {
68         return new Object[]{entity.getOwnerId(), entity.getEventId()};
69     }
70
71     @Override
72     public List<NotificationEntity> list(NotificationEntity entity) {
73         return accessor.list(entity.getOwnerId()).all();
74     }
75
76     @Override
77     public List<NotificationEntity> getNotificationsByOwnerId(String ownerId, int limit) {
78         return accessor.getNotifications(ownerId, limit).all();
79     }
80
81     @Override
82     public List<NotificationEntity> getNewNotificationsByOwnerId(String ownerId, UUID eventId) {
83         return getNewNotificationsByOwnerId(ownerId, eventId,
84             DEFAULT_LIMIT_OF_RESULTS_FOR_OWNER_NOTIFICATIONS);
85     }
86
87     @Override
88     public List<NotificationEntity> getNewNotificationsByOwnerId(String ownerId, UUID eventId, int limit) {
89         if (Objects.isNull(eventId)) {
90             return getNotificationsByOwnerId(ownerId, limit);
91         }
92         return accessor.getNewNotifications(ownerId, eventId, limit).all();
93     }
94
95     @Override
96     public void markNotificationAsRead(String ownerId, Collection<UUID> eventIds) {
97         eventIds.forEach(eventId -> accessor.markAsRead(ownerId, eventId));
98     }
99
100     @Override
101     public NotificationsStatus getNotificationsStatus(String ownerId, UUID lastScannedEventId, int numOfRecordsToReturn) {
102         NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
103         List<NotificationEntity> entities = accessor.getNotifications(ownerId, numOfRecordsToReturn).all();
104         if (CollectionUtils.isNotEmpty(entities)) {
105             long lastSeen = UUIDs.unixTimestamp(lastScannedEventId);
106             populateNewNotifications(notificationsStatus, entities, lastSeen);
107             UUID firstScannedEventId = entities.get(0).getEventId();
108                 notificationsStatus.setLastScanned(firstScannedEventId);
109                 notificationsStatus.setNumOfNotSeenNotifications(accessor.getNewNotificationsCount(ownerId, lastScannedEventId, firstScannedEventId).one().getLong(0));
110         }
111         return notificationsStatus;
112     }
113
114     private void populateNewNotifications(NotificationsStatusImpl notificationsStatus, List<NotificationEntity> entities, long lastSeen) {
115         for (NotificationEntity entity : entities) {
116             UUID eventId = entity.getEventId();
117             notificationsStatus.addNotification(entity);
118             if (UUIDs.unixTimestamp(eventId) > lastSeen) {
119                 notificationsStatus.addNewNotificationUUID(eventId);
120             }
121         }
122     }
123
124     @Override
125     public NotificationsStatus getNotificationsStatus(String ownerId, UUID lastSeenNotification, int numOfRecordsToReturn, UUID prevLastScannedEventId) {
126         NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
127         List<NotificationEntity> entities = accessor.getPrevNotifications(ownerId, prevLastScannedEventId, numOfRecordsToReturn).all();
128         if (CollectionUtils.isNotEmpty(entities)) {
129                 long lastSeen = UUIDs.unixTimestamp(lastSeenNotification);
130             populateNewNotifications(notificationsStatus, entities, lastSeen);
131         }
132         return notificationsStatus;
133     }
134
135 /*
136     @Override
137     public NotificationsStatus getNotificationsStatus(String ownerId,
138                                                       LastSeenNotificationEntity lastSeenNotification,
139                                                       int numOfRecordsToReturn) {
140
141         List<NotificationEntity> notificationEntities =
142             fetchNewNotifications(lastSeenNotification, numOfRecordsToReturn);
143         NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
144         if (CollectionUtils.isEmpty(notificationEntities)) {
145             return notificationsStatus;
146         }
147
148         notificationEntities.forEach(notification -> {
149             if (isNewNotification(lastSeenNotification, notification)) {
150                 notificationsStatus.addNewNotificationUUID(notification.getEventId());
151             }
152             notificationsStatus.addNotification(notification);
153         });
154
155         Optional<NotificationEntity> latestNotification = notificationEntities.stream().findFirst();
156         latestNotification.ifPresent(e -> notificationsStatus.setLastScanned(e.getEventId()));
157         return notificationsStatus;
158     }
159
160     private List<NotificationEntity> fetchNewNotifications(
161         LastSeenNotificationEntity lastSeenNotification, int numOfRecordsToReturn) {
162         String ownerId = lastSeenNotification.getOwnerId();
163         UUID lastEventId = lastSeenNotification.getLastEventId();
164         List<NotificationEntity> newNotificationsByOwnerId =
165             getNewNotificationsByOwnerId(ownerId, lastEventId);
166         newNotificationsByOwnerId = fetchMoreIfNeeded(ownerId, newNotificationsByOwnerId,
167             numOfRecordsToReturn, lastEventId);
168         return newNotificationsByOwnerId;
169     }
170
171     private boolean isNewNotification(LastSeenNotificationEntity lastSeenNotification,
172                                       NotificationEntity notification) {
173         return Objects.isNull(lastSeenNotification.getLastEventId()) ||
174             UUIDs.unixTimestamp(notification.getEventId()) >
175                 UUIDs.unixTimestamp(lastSeenNotification.getLastEventId());
176     }
177 */
178
179     @Override
180     public void createBatch(List<NotificationEntity> notificationEntities) {
181         BatchStatement batch = new BatchStatement();
182         List<Statement> statements = notificationEntities.stream()
183             .map(mapper::saveQuery)
184             .collect(Collectors.toList());
185         batch.addAll(statements);
186         getSession().execute(batch);
187     }
188
189     @Accessor
190     interface NotificationsAccessor {
191
192         @Query("select * from notifications where owner_id=?")
193         Result<NotificationEntity> list(String ownerId);
194
195         @Query("select * from notifications where owner_id=? limit ?")
196         Result<NotificationEntity> getNotifications(String ownerId, int limit);
197
198         @Query("select * from notifications where owner_id=? and event_id > ? limit ?")
199         Result<NotificationEntity> getNewNotifications(String ownerId, UUID lastScannedEventId, int limit);
200
201         @Query("select * from notifications where owner_id=? and event_id < ? limit ?")
202         Result<NotificationEntity> getPrevNotifications(String ownerId, UUID prevLastScannedEventId, int limit);
203
204         @Query("select count(*) from notifications where owner_id=? and event_id > ? and event_id <= ?")
205         ResultSet getNewNotificationsCount(String ownerId, UUID lastScannedEventId, UUID firstScannedEventId);
206
207         @Query("update notifications set read=true where owner_id=? and event_id=?")
208         ResultSet markAsRead(String ownerId, UUID eventId);
209     }
210
211     private class NotificationsStatusImpl implements NotificationsStatus {
212
213         private List<NotificationEntity> notifications = new ArrayList<>();
214         private List<UUID> newEntries = new ArrayList<>();
215         private UUID lastScanned;
216         private UUID endOfPage;
217         private long numOfNotSeenNotifications = 0;
218
219         void addNotification(NotificationEntity notification) {
220             notifications.add(notification);
221             endOfPage = notification.getEventId();
222         }
223
224         void addNewNotificationUUID(UUID notificationUuid) {
225             newEntries.add(notificationUuid);
226         }
227
228         @Override
229         public List<NotificationEntity> getNotifications() {
230             return Collections.unmodifiableList(notifications);
231         }
232
233         @Override
234         public List<UUID> getNewEntries() {
235             return Collections.unmodifiableList(newEntries);
236         }
237
238         @Override
239         public UUID getLastScanned() {
240             return lastScanned;
241         }
242
243         void setLastScanned(UUID lastScanned) {
244             this.lastScanned = lastScanned;
245         }
246
247         @Override
248         public UUID getEndOfPage() {
249             return endOfPage;
250         }
251
252         @Override
253         public long getNumOfNotSeenNotifications() {
254             return numOfNotSeenNotifications;
255         }
256
257         void setNumOfNotSeenNotifications(long numOfNotSeenNotifications) {
258             this.numOfNotSeenNotifications = numOfNotSeenNotifications;
259         }
260     }
261
262 /*
263     private List<NotificationEntity> fetchMoreIfNeeded(String ownerId,
264                                                        List<NotificationEntity> notificationEntities,
265                                                        int numOfRecordsToReturn, UUID lastEventId) {
266
267         if (numOfRecordsToReturn <= notificationEntities.size() || Objects.isNull(lastEventId)) {
268             return notificationEntities;
269         }
270
271         int multiplier = 2;
272         while (numOfRecordsToReturn > notificationEntities.size()) {
273
274             int bring = notificationEntities.size() +
275                 (numOfRecordsToReturn - notificationEntities.size()) * multiplier;
276             notificationEntities = getNotificationsByOwnerId(ownerId, bring);
277
278             if (notificationEntities.size() < bring) {
279                 return notificationEntities;
280             }
281             multiplier++;
282         }
283         return notificationEntities;
284     }
285 */
286
287 }