re base code
[sdc.git] / openecomp-be / lib / openecomp-sdc-notification-lib / openecomp-sdc-notification-core / src / main / java / org / openecomp / sdc / notification / dao / impl / NotificationsDaoCassandraImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.notification.dao.impl;
22
23 import com.datastax.driver.core.BatchStatement;
24 import com.datastax.driver.core.ResultSet;
25 import com.datastax.driver.core.Statement;
26 import com.datastax.driver.core.utils.UUIDs;
27 import com.datastax.driver.mapping.Mapper;
28 import com.datastax.driver.mapping.Result;
29 import com.datastax.driver.mapping.annotations.Accessor;
30 import com.datastax.driver.mapping.annotations.Query;
31 import org.apache.commons.collections.CollectionUtils;
32 import org.openecomp.core.dao.impl.CassandraBaseDao;
33 import org.openecomp.core.nosqldb.api.NoSqlDb;
34 import org.openecomp.core.nosqldb.factory.NoSqlDbFactory;
35 import org.openecomp.sdc.notification.dao.NotificationsDao;
36 import org.openecomp.sdc.notification.dao.types.NotificationEntity;
37 import org.openecomp.sdc.notification.dtos.NotificationsStatus;
38
39 import java.util.*;
40 import java.util.stream.Collectors;
41
42 import static org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory.getSession;
43
44 //import org.openecomp.sdc.notification.dao.types.LastSeenNotificationEntity;
45 //import java.util.Optional;
46
47 public class NotificationsDaoCassandraImpl extends CassandraBaseDao<NotificationEntity>
48     implements NotificationsDao {
49
50     private static final NoSqlDb noSqlDb = NoSqlDbFactory.getInstance().createInterface();
51     private static final Mapper<NotificationEntity> mapper =
52         noSqlDb.getMappingManager().mapper(NotificationEntity.class);
53     private static final NotificationsAccessor accessor =
54         noSqlDb.getMappingManager().createAccessor(NotificationsAccessor.class);
55
56     @Override
57     protected Mapper<NotificationEntity> getMapper() {
58         return mapper;
59     }
60
61     @Override
62     protected Object[] getKeys(NotificationEntity entity) {
63         return new Object[]{entity.getOwnerId(), entity.getEventId()};
64     }
65
66     @Override
67     public List<NotificationEntity> list(NotificationEntity entity) {
68         return accessor.list(entity.getOwnerId()).all();
69     }
70
71     @Override
72     public List<NotificationEntity> getNotificationsByOwnerId(String ownerId, int limit) {
73         return accessor.getNotifications(ownerId, limit).all();
74     }
75
76     @Override
77     public List<NotificationEntity> getNewNotificationsByOwnerId(String ownerId, UUID eventId) {
78         return getNewNotificationsByOwnerId(ownerId, eventId,
79             DEFAULT_LIMIT_OF_RESULTS_FOR_OWNER_NOTIFICATIONS);
80     }
81
82     @Override
83     public List<NotificationEntity> getNewNotificationsByOwnerId(String ownerId, UUID eventId, int limit) {
84         if (Objects.isNull(eventId)) {
85             return getNotificationsByOwnerId(ownerId, limit);
86         }
87         return accessor.getNewNotifications(ownerId, eventId, limit).all();
88     }
89
90     @Override
91     public void markNotificationAsRead(String ownerId, Collection<UUID> eventIds) {
92         eventIds.forEach(eventId -> accessor.markAsRead(ownerId, eventId));
93     }
94
95     @Override
96     public NotificationsStatus getNotificationsStatus(String ownerId, UUID lastScannedEventId, int numOfRecordsToReturn) {
97         NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
98         List<NotificationEntity> entities = accessor.getNotifications(ownerId, numOfRecordsToReturn).all();
99         if (CollectionUtils.isNotEmpty(entities)) {
100             long lastSeen = UUIDs.unixTimestamp(lastScannedEventId);
101             populateNewNotifications(notificationsStatus, entities, lastSeen);
102             UUID firstScannedEventId = entities.get(0).getEventId();
103                 notificationsStatus.setLastScanned(firstScannedEventId);
104                 notificationsStatus.setNumOfNotSeenNotifications(accessor.getNewNotificationsCount(ownerId, lastScannedEventId, firstScannedEventId).one().getLong(0));
105         }
106         return notificationsStatus;
107     }
108
109     private void populateNewNotifications(NotificationsStatusImpl notificationsStatus, List<NotificationEntity> entities, long lastSeen) {
110         for (NotificationEntity entity : entities) {
111             UUID eventId = entity.getEventId();
112             notificationsStatus.addNotification(entity);
113             if (UUIDs.unixTimestamp(eventId) > lastSeen) {
114                 notificationsStatus.addNewNotificationUUID(eventId);
115             }
116         }
117     }
118
119     @Override
120     public NotificationsStatus getNotificationsStatus(String ownerId, UUID lastSeenNotification, int numOfRecordsToReturn, UUID prevLastScannedEventId) {
121         NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
122         List<NotificationEntity> entities = accessor.getPrevNotifications(ownerId, prevLastScannedEventId, numOfRecordsToReturn).all();
123         if (CollectionUtils.isNotEmpty(entities)) {
124                 long lastSeen = UUIDs.unixTimestamp(lastSeenNotification);
125             populateNewNotifications(notificationsStatus, entities, lastSeen);
126         }
127         return notificationsStatus;
128     }
129
130 /*
131     @Override
132     public NotificationsStatus getNotificationsStatus(String ownerId,
133                                                       LastSeenNotificationEntity lastSeenNotification,
134                                                       int numOfRecordsToReturn) {
135
136         List<NotificationEntity> notificationEntities =
137             fetchNewNotifications(lastSeenNotification, numOfRecordsToReturn);
138         NotificationsStatusImpl notificationsStatus = new NotificationsStatusImpl();
139         if (CollectionUtils.isEmpty(notificationEntities)) {
140             return notificationsStatus;
141         }
142
143         notificationEntities.forEach(notification -> {
144             if (isNewNotification(lastSeenNotification, notification)) {
145                 notificationsStatus.addNewNotificationUUID(notification.getEventId());
146             }
147             notificationsStatus.addNotification(notification);
148         });
149
150         Optional<NotificationEntity> latestNotification = notificationEntities.stream().findFirst();
151         latestNotification.ifPresent(e -> notificationsStatus.setLastScanned(e.getEventId()));
152         return notificationsStatus;
153     }
154
155     private List<NotificationEntity> fetchNewNotifications(
156         LastSeenNotificationEntity lastSeenNotification, int numOfRecordsToReturn) {
157         String ownerId = lastSeenNotification.getOwnerId();
158         UUID lastEventId = lastSeenNotification.getLastEventId();
159         List<NotificationEntity> newNotificationsByOwnerId =
160             getNewNotificationsByOwnerId(ownerId, lastEventId);
161         newNotificationsByOwnerId = fetchMoreIfNeeded(ownerId, newNotificationsByOwnerId,
162             numOfRecordsToReturn, lastEventId);
163         return newNotificationsByOwnerId;
164     }
165
166     private boolean isNewNotification(LastSeenNotificationEntity lastSeenNotification,
167                                       NotificationEntity notification) {
168         return Objects.isNull(lastSeenNotification.getLastEventId()) ||
169             UUIDs.unixTimestamp(notification.getEventId()) >
170                 UUIDs.unixTimestamp(lastSeenNotification.getLastEventId());
171     }
172 */
173
174     @Override
175     public void createBatch(List<NotificationEntity> notificationEntities) {
176         BatchStatement batch = new BatchStatement();
177         List<Statement> statements = notificationEntities.stream()
178             .map(mapper::saveQuery)
179             .collect(Collectors.toList());
180         batch.addAll(statements);
181         getSession().execute(batch);
182     }
183
184     @Accessor
185     interface NotificationsAccessor {
186
187         @Query("select * from notifications where owner_id=?")
188         Result<NotificationEntity> list(String ownerId);
189
190         @Query("select * from notifications where owner_id=? limit ?")
191         Result<NotificationEntity> getNotifications(String ownerId, int limit);
192
193         @Query("select * from notifications where owner_id=? and event_id > ? limit ?")
194         Result<NotificationEntity> getNewNotifications(String ownerId, UUID lastScannedEventId, int limit);
195
196         @Query("select * from notifications where owner_id=? and event_id < ? limit ?")
197         Result<NotificationEntity> getPrevNotifications(String ownerId, UUID prevLastScannedEventId, int limit);
198
199         @Query("select count(*) from notifications where owner_id=? and event_id > ? and event_id <= ?")
200         ResultSet getNewNotificationsCount(String ownerId, UUID lastScannedEventId, UUID firstScannedEventId);
201
202         @Query("update notifications set read=true where owner_id=? and event_id=?")
203         ResultSet markAsRead(String ownerId, UUID eventId);
204     }
205
206     private class NotificationsStatusImpl implements NotificationsStatus {
207
208         private List<NotificationEntity> notifications = new ArrayList<>();
209         private List<UUID> newEntries = new ArrayList<>();
210         private UUID lastScanned;
211         private UUID endOfPage;
212         private long numOfNotSeenNotifications = 0;
213
214         void addNotification(NotificationEntity notification) {
215             notifications.add(notification);
216             endOfPage = notification.getEventId();
217         }
218
219         void addNewNotificationUUID(UUID notificationUuid) {
220             newEntries.add(notificationUuid);
221         }
222
223         @Override
224         public List<NotificationEntity> getNotifications() {
225             return Collections.unmodifiableList(notifications);
226         }
227
228         @Override
229         public List<UUID> getNewEntries() {
230             return Collections.unmodifiableList(newEntries);
231         }
232
233         @Override
234         public UUID getLastScanned() {
235             return lastScanned;
236         }
237
238         void setLastScanned(UUID lastScanned) {
239             this.lastScanned = lastScanned;
240         }
241
242         @Override
243         public UUID getEndOfPage() {
244             return endOfPage;
245         }
246
247         @Override
248         public long getNumOfNotSeenNotifications() {
249             return numOfNotSeenNotifications;
250         }
251
252         void setNumOfNotSeenNotifications(long numOfNotSeenNotifications) {
253             this.numOfNotSeenNotifications = numOfNotSeenNotifications;
254         }
255     }
256
257 /*
258     private List<NotificationEntity> fetchMoreIfNeeded(String ownerId,
259                                                        List<NotificationEntity> notificationEntities,
260                                                        int numOfRecordsToReturn, UUID lastEventId) {
261
262         if (numOfRecordsToReturn <= notificationEntities.size() || Objects.isNull(lastEventId)) {
263             return notificationEntities;
264         }
265
266         int multiplier = 2;
267         while (numOfRecordsToReturn > notificationEntities.size()) {
268
269             int bring = notificationEntities.size() +
270                 (numOfRecordsToReturn - notificationEntities.size()) * multiplier;
271             notificationEntities = getNotificationsByOwnerId(ownerId, bring);
272
273             if (notificationEntities.size() < bring) {
274                 return notificationEntities;
275             }
276             multiplier++;
277         }
278         return notificationEntities;
279     }
280 */
281
282 }