Add new modules: Resource Lock and Doorman
[ccsdk/features.git] / lib / doorman / src / main / java / org / onap / ccsdk / features / lib / doorman / dao / MessageDaoImpl.java
1 package org.onap.ccsdk.features.lib.doorman.dao;
2
3 import java.sql.Connection;
4 import java.sql.PreparedStatement;
5 import java.sql.ResultSet;
6 import java.sql.SQLException;
7 import java.sql.Statement;
8 import java.sql.Timestamp;
9 import java.sql.Types;
10 import java.util.ArrayList;
11 import java.util.Date;
12 import java.util.List;
13 import java.util.Map;
14 import javax.sql.DataSource;
15 import org.onap.ccsdk.features.lib.doorman.data.ActionStatus;
16 import org.onap.ccsdk.features.lib.doorman.data.Message;
17 import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
18 import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue;
19 import org.onap.ccsdk.features.lib.doorman.data.MessageData;
20 import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
21 import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue;
22 import org.onap.ccsdk.features.lib.doorman.data.Queue;
23 import org.onap.ccsdk.features.lib.doorman.util.JsonUtil;
24
25 public class MessageDaoImpl implements MessageDao {
26
27         private DataSource dataSource;
28
29         @Override
30         public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) {
31                 try (Connection con = dataSource.getConnection()) {
32                         try {
33                                 con.setAutoCommit(false);
34                                 long id = 0;
35                                 String sql = "INSERT INTO message (ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id) VALUES (?, ?, ?, ?, ?, ?)";
36                                 try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
37                                         ps.setString(1, extMessageId);
38                                         ps.setString(2, JsonUtil.dataToJson(request.param));
39                                         ps.setString(3, request.body);
40                                         ps.setTimestamp(4, new Timestamp(timestamp.getTime()));
41                                         if (queue != null) {
42                                                 ps.setString(5, queue.type);
43                                                 ps.setString(6, queue.id);
44                                         } else {
45                                                 ps.setNull(5, Types.VARCHAR);
46                                                 ps.setNull(6, Types.VARCHAR);
47                                         }
48                                         ps.executeUpdate();
49                                         try (ResultSet rs = ps.getGeneratedKeys()) {
50                                                 rs.next();
51                                                 id = rs.getLong(1);
52                                         }
53                                 }
54                                 con.commit();
55                                 return id;
56                         } catch (SQLException ex) {
57                                 con.rollback();
58                                 throw ex;
59                         }
60                 } catch (SQLException e) {
61                         throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e);
62                 }
63         }
64
65         @Override
66         public void updateMessageStarted(long messageId, Date timestamp) {
67                 updateMessageStatus("started_timestamp", messageId, null, timestamp);
68         }
69
70         @Override
71         public void updateMessageCompleted(long messageId, String resolution, Date timestamp) {
72                 updateMessageStatus("completed_timestamp", messageId, resolution, timestamp);
73         }
74
75         private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) {
76                 try (Connection con = dataSource.getConnection()) {
77                         try {
78                                 con.setAutoCommit(false);
79                                 String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?";
80                                 try (PreparedStatement ps = con.prepareStatement(sql)) {
81                                         ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
82                                         ps.setLong(2, messageId);
83                                         ps.executeUpdate();
84                                 }
85                                 con.commit();
86                         } catch (SQLException ex) {
87                                 con.rollback();
88                                 throw ex;
89                         }
90                 } catch (SQLException e) {
91                         throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e);
92                 }
93         }
94
95         @Override
96         public void updateMessageResponse(long messageId, Date timestamp, MessageData response) {
97                 try (Connection con = dataSource.getConnection()) {
98                         try {
99                                 con.setAutoCommit(false);
100                                 String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ? WHERE message_id = ?";
101                                 try (PreparedStatement ps = con.prepareStatement(sql)) {
102                                         ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
103                                         ps.setString(2, JsonUtil.dataToJson(response.param));
104                                         ps.setString(3, response.body);
105                                         ps.setLong(4, messageId);
106                                         ps.executeUpdate();
107                                 }
108                                 con.commit();
109                         } catch (SQLException ex) {
110                                 con.rollback();
111                                 throw ex;
112                         }
113                 } catch (SQLException e) {
114                         throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e);
115                 }
116         }
117
118         @Override
119         public void addStatus(long messageId, MessageStatus status) {
120                 try (Connection con = dataSource.getConnection()) {
121                         try {
122                                 con.setAutoCommit(false);
123                                 String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)";
124                                 try (PreparedStatement ps = con.prepareStatement(sql)) {
125                                         ps.setLong(1, messageId);
126                                         ps.setString(2, status.status.toString());
127                                         ps.setTimestamp(3, new Timestamp(status.timestamp.getTime()));
128                                         ps.executeUpdate();
129                                 }
130                                 con.commit();
131                         } catch (SQLException ex) {
132                                 con.rollback();
133                                 throw ex;
134                         }
135                 } catch (SQLException e) {
136                         throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e);
137                 }
138         }
139
140         @Override
141         public void addAction(long messageId, MessageAction action) {
142                 try (Connection con = dataSource.getConnection()) {
143                         try {
144                                 con.setAutoCommit(false);
145                                 String sql = "INSERT INTO message_action (message_id, action, action_status, resolution, action_timestamp, done_timestamp, hold_time, response_param, response_body) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
146                                 try (PreparedStatement ps = con.prepareStatement(sql)) {
147                                         ps.setLong(1, messageId);
148                                         ps.setString(2, action.action.toString());
149                                         ps.setString(3, action.actionStatus.toString());
150                                         ps.setString(4, action.resolution);
151                                         ps.setTimestamp(5, new Timestamp(action.timestamp.getTime()));
152                                         if (action.doneTimestamp != null) {
153                                                 ps.setTimestamp(6, new Timestamp(action.doneTimestamp.getTime()));
154                                         } else {
155                                                 ps.setNull(6, Types.TIMESTAMP);
156                                         }
157                                         ps.setInt(7, action.holdTime);
158                                         if (action.returnResponse != null) {
159                                                 ps.setString(8, JsonUtil.dataToJson(action.returnResponse.param));
160                                                 ps.setString(9, action.returnResponse.body);
161                                         } else {
162                                                 ps.setNull(8, Types.VARCHAR);
163                                                 ps.setNull(9, Types.VARCHAR);
164                                         }
165                                         ps.executeUpdate();
166                                 }
167                                 con.commit();
168                         } catch (SQLException ex) {
169                                 con.rollback();
170                                 throw ex;
171                         }
172                 } catch (SQLException e) {
173                         throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e);
174                 }
175         }
176
177         @Override
178         public void updateActionDone(long actionId, Date now) {
179                 try (Connection con = dataSource.getConnection()) {
180                         try {
181                                 con.setAutoCommit(false);
182                                 String sql = "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?";
183                                 try (PreparedStatement ps = con.prepareStatement(sql)) {
184                                         ps.setString(1, ActionStatus.DONE.toString());
185                                         ps.setTimestamp(2, new Timestamp(now.getTime()));
186                                         ps.setLong(3, actionId);
187                                         ps.executeUpdate();
188                                 }
189                                 con.commit();
190                         } catch (SQLException ex) {
191                                 con.rollback();
192                                 throw ex;
193                         }
194                 } catch (SQLException e) {
195                         throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e);
196                 }
197         }
198
199         @SuppressWarnings("unchecked")
200         @Override
201         public List<Message> readMessageQueue(Queue queue) {
202                 List<Message> messageList = new ArrayList<>();
203                 try (Connection con = dataSource.getConnection()) {
204                         String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?";
205                         String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC";
206                         String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC";
207                         try (PreparedStatement mps = con.prepareStatement(msql); PreparedStatement sps = con.prepareStatement(ssql); PreparedStatement aps = con.prepareStatement(asql)) {
208                                 mps.setString(1, queue.type);
209                                 mps.setString(2, queue.id);
210                                 try (ResultSet mrs = mps.executeQuery()) {
211                                         while (mrs.next()) {
212                                                 Message m = new Message();
213                                                 m.messageId = mrs.getLong("message_id");
214                                                 m.extMessageId = mrs.getString("ext_message_id");
215                                                 m.request = new MessageData();
216                                                 m.request.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param"));
217                                                 m.request.body = mrs.getString("request_body");
218                                                 m.response = new MessageData();
219                                                 m.response.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param"));
220                                                 m.response.body = mrs.getString("response_body");
221                                                 m.queue = new Queue();
222                                                 m.queue.type = mrs.getString("queue_type");
223                                                 m.queue.id = mrs.getString("queue_id");
224                                                 m.arrivedTimestamp = mrs.getTimestamp("arrived_timestamp");
225                                                 m.startedTimestamp = mrs.getTimestamp("started_timestamp");
226                                                 m.completedTimestamp = mrs.getTimestamp("completed_timestamp");
227                                                 m.responseTimestamp = mrs.getTimestamp("response_timestamp");
228                                                 m.statusHistory = new ArrayList<>();
229                                                 m.actionHistory = new ArrayList<>();
230                                                 messageList.add(m);
231
232                                                 sps.setLong(1, m.messageId);
233                                                 try (ResultSet srs = sps.executeQuery()) {
234                                                         while (srs.next()) {
235                                                                 MessageStatus s = new MessageStatus();
236                                                                 s.status = MessageStatusValue.valueOf(srs.getString("status"));
237                                                                 s.timestamp = srs.getTimestamp("status_timestamp");
238                                                                 m.statusHistory.add(s);
239                                                         }
240                                                 }
241
242                                                 aps.setLong(1, m.messageId);
243                                                 try (ResultSet ars = aps.executeQuery()) {
244                                                         while (ars.next()) {
245                                                                 MessageAction a = new MessageAction();
246                                                                 a.actionId = ars.getLong("message_action_id");
247                                                                 a.action = MessageActionValue.valueOf(ars.getString("action"));
248                                                                 a.actionStatus = ActionStatus.valueOf(ars.getString("action_status"));
249                                                                 a.timestamp = ars.getTimestamp("action_timestamp");
250                                                                 a.doneTimestamp = ars.getTimestamp("done_timestamp");
251                                                                 a.holdTime = ars.getInt("hold_time");
252                                                                 a.returnResponse = new MessageData();
253                                                                 a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param"));
254                                                                 a.returnResponse.body = ars.getString("response_body");
255                                                                 if (a.returnResponse.param == null && a.returnResponse.body == null) {
256                                                                         a.returnResponse = null;
257                                                                 }
258                                                                 a.resolution = ars.getString("resolution");
259                                                                 m.actionHistory.add(a);
260                                                         }
261                                                 }
262                                         }
263                                 }
264                         }
265                 } catch (SQLException e) {
266                         throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
267                 }
268                 return messageList;
269         }
270
271         @SuppressWarnings("unchecked")
272         @Override
273         public MessageAction getNextAction(long messageId) {
274                 try (Connection con = dataSource.getConnection()) {
275                         String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC";
276                         try (PreparedStatement ps = con.prepareStatement(sql)) {
277                                 ps.setLong(1, messageId);
278                                 try (ResultSet rs = ps.executeQuery()) {
279                                         if (rs.next()) {
280                                                 MessageAction a = new MessageAction();
281                                                 a.actionId = rs.getLong("message_action_id");
282                                                 a.action = MessageActionValue.valueOf(rs.getString("action"));
283                                                 a.actionStatus = ActionStatus.valueOf(rs.getString("action_status"));
284                                                 a.timestamp = rs.getTimestamp("action_timestamp");
285                                                 a.doneTimestamp = rs.getTimestamp("done_timestamp");
286                                                 a.holdTime = rs.getInt("hold_time");
287                                                 a.returnResponse = new MessageData();
288                                                 a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param"));
289                                                 a.returnResponse.body = rs.getString("response_body");
290                                                 if (a.returnResponse.param == null && a.returnResponse.body == null) {
291                                                         a.returnResponse = null;
292                                                 }
293                                                 a.resolution = rs.getString("resolution");
294                                                 return a;
295                                         }
296                                         return null;
297                                 }
298                         }
299                 } catch (SQLException e) {
300                         throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
301                 }
302         }
303
304         public void setDataSource(DataSource dataSource) {
305                 this.dataSource = dataSource;
306         }
307 }