1 package org.onap.ccsdk.features.lib.doorman.dao;
3 import java.sql.Connection;
4 import java.sql.PreparedStatement;
5 import java.sql.ResultSet;
6 import java.sql.SQLException;
7 import java.sql.Statement;
8 import java.sql.Timestamp;
10 import java.util.ArrayList;
11 import java.util.Date;
12 import java.util.List;
14 import javax.sql.DataSource;
15 import org.onap.ccsdk.features.lib.doorman.data.ActionStatus;
16 import org.onap.ccsdk.features.lib.doorman.data.Message;
17 import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
18 import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue;
19 import org.onap.ccsdk.features.lib.doorman.data.MessageData;
20 import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
21 import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue;
22 import org.onap.ccsdk.features.lib.doorman.data.Queue;
23 import org.onap.ccsdk.features.lib.doorman.util.JsonUtil;
25 public class MessageDaoImpl implements MessageDao {
27 private DataSource dataSource;
30 public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) {
31 try (Connection con = dataSource.getConnection()) {
33 con.setAutoCommit(false);
35 String sql = "INSERT INTO message (ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id) VALUES (?, ?, ?, ?, ?, ?)";
36 try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
37 ps.setString(1, extMessageId);
38 ps.setString(2, JsonUtil.dataToJson(request.param));
39 ps.setString(3, request.body);
40 ps.setTimestamp(4, new Timestamp(timestamp.getTime()));
42 ps.setString(5, queue.type);
43 ps.setString(6, queue.id);
45 ps.setNull(5, Types.VARCHAR);
46 ps.setNull(6, Types.VARCHAR);
49 try (ResultSet rs = ps.getGeneratedKeys()) {
56 } catch (SQLException ex) {
60 } catch (SQLException e) {
61 throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e);
66 public void updateMessageStarted(long messageId, Date timestamp) {
67 updateMessageStatus("started_timestamp", messageId, null, timestamp);
71 public void updateMessageCompleted(long messageId, String resolution, Date timestamp) {
72 updateMessageStatus("completed_timestamp", messageId, resolution, timestamp);
75 private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) {
76 try (Connection con = dataSource.getConnection()) {
78 con.setAutoCommit(false);
79 String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?";
80 try (PreparedStatement ps = con.prepareStatement(sql)) {
81 ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
82 ps.setLong(2, messageId);
86 } catch (SQLException ex) {
90 } catch (SQLException e) {
91 throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e);
96 public void updateMessageResponse(long messageId, Date timestamp, MessageData response) {
97 try (Connection con = dataSource.getConnection()) {
99 con.setAutoCommit(false);
100 String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ? WHERE message_id = ?";
101 try (PreparedStatement ps = con.prepareStatement(sql)) {
102 ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
103 ps.setString(2, JsonUtil.dataToJson(response.param));
104 ps.setString(3, response.body);
105 ps.setLong(4, messageId);
109 } catch (SQLException ex) {
113 } catch (SQLException e) {
114 throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e);
119 public void addStatus(long messageId, MessageStatus status) {
120 try (Connection con = dataSource.getConnection()) {
122 con.setAutoCommit(false);
123 String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)";
124 try (PreparedStatement ps = con.prepareStatement(sql)) {
125 ps.setLong(1, messageId);
126 ps.setString(2, status.status.toString());
127 ps.setTimestamp(3, new Timestamp(status.timestamp.getTime()));
131 } catch (SQLException ex) {
135 } catch (SQLException e) {
136 throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e);
141 public void addAction(long messageId, MessageAction action) {
142 try (Connection con = dataSource.getConnection()) {
144 con.setAutoCommit(false);
145 String sql = "INSERT INTO message_action (message_id, action, action_status, resolution, action_timestamp, done_timestamp, hold_time, response_param, response_body) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
146 try (PreparedStatement ps = con.prepareStatement(sql)) {
147 ps.setLong(1, messageId);
148 ps.setString(2, action.action.toString());
149 ps.setString(3, action.actionStatus.toString());
150 ps.setString(4, action.resolution);
151 ps.setTimestamp(5, new Timestamp(action.timestamp.getTime()));
152 if (action.doneTimestamp != null) {
153 ps.setTimestamp(6, new Timestamp(action.doneTimestamp.getTime()));
155 ps.setNull(6, Types.TIMESTAMP);
157 ps.setInt(7, action.holdTime);
158 if (action.returnResponse != null) {
159 ps.setString(8, JsonUtil.dataToJson(action.returnResponse.param));
160 ps.setString(9, action.returnResponse.body);
162 ps.setNull(8, Types.VARCHAR);
163 ps.setNull(9, Types.VARCHAR);
168 } catch (SQLException ex) {
172 } catch (SQLException e) {
173 throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e);
178 public void updateActionDone(long actionId, Date now) {
179 try (Connection con = dataSource.getConnection()) {
181 con.setAutoCommit(false);
182 String sql = "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?";
183 try (PreparedStatement ps = con.prepareStatement(sql)) {
184 ps.setString(1, ActionStatus.DONE.toString());
185 ps.setTimestamp(2, new Timestamp(now.getTime()));
186 ps.setLong(3, actionId);
190 } catch (SQLException ex) {
194 } catch (SQLException e) {
195 throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e);
199 @SuppressWarnings("unchecked")
201 public List<Message> readMessageQueue(Queue queue) {
202 List<Message> messageList = new ArrayList<>();
203 try (Connection con = dataSource.getConnection()) {
204 String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?";
205 String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC";
206 String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC";
207 try (PreparedStatement mps = con.prepareStatement(msql); PreparedStatement sps = con.prepareStatement(ssql); PreparedStatement aps = con.prepareStatement(asql)) {
208 mps.setString(1, queue.type);
209 mps.setString(2, queue.id);
210 try (ResultSet mrs = mps.executeQuery()) {
212 Message m = new Message();
213 m.messageId = mrs.getLong("message_id");
214 m.extMessageId = mrs.getString("ext_message_id");
215 m.request = new MessageData();
216 m.request.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param"));
217 m.request.body = mrs.getString("request_body");
218 m.response = new MessageData();
219 m.response.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param"));
220 m.response.body = mrs.getString("response_body");
221 m.queue = new Queue();
222 m.queue.type = mrs.getString("queue_type");
223 m.queue.id = mrs.getString("queue_id");
224 m.arrivedTimestamp = mrs.getTimestamp("arrived_timestamp");
225 m.startedTimestamp = mrs.getTimestamp("started_timestamp");
226 m.completedTimestamp = mrs.getTimestamp("completed_timestamp");
227 m.responseTimestamp = mrs.getTimestamp("response_timestamp");
228 m.statusHistory = new ArrayList<>();
229 m.actionHistory = new ArrayList<>();
232 sps.setLong(1, m.messageId);
233 try (ResultSet srs = sps.executeQuery()) {
235 MessageStatus s = new MessageStatus();
236 s.status = MessageStatusValue.valueOf(srs.getString("status"));
237 s.timestamp = srs.getTimestamp("status_timestamp");
238 m.statusHistory.add(s);
242 aps.setLong(1, m.messageId);
243 try (ResultSet ars = aps.executeQuery()) {
245 MessageAction a = new MessageAction();
246 a.actionId = ars.getLong("message_action_id");
247 a.action = MessageActionValue.valueOf(ars.getString("action"));
248 a.actionStatus = ActionStatus.valueOf(ars.getString("action_status"));
249 a.timestamp = ars.getTimestamp("action_timestamp");
250 a.doneTimestamp = ars.getTimestamp("done_timestamp");
251 a.holdTime = ars.getInt("hold_time");
252 a.returnResponse = new MessageData();
253 a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param"));
254 a.returnResponse.body = ars.getString("response_body");
255 if (a.returnResponse.param == null && a.returnResponse.body == null) {
256 a.returnResponse = null;
258 a.resolution = ars.getString("resolution");
259 m.actionHistory.add(a);
265 } catch (SQLException e) {
266 throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
271 @SuppressWarnings("unchecked")
273 public MessageAction getNextAction(long messageId) {
274 try (Connection con = dataSource.getConnection()) {
275 String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC";
276 try (PreparedStatement ps = con.prepareStatement(sql)) {
277 ps.setLong(1, messageId);
278 try (ResultSet rs = ps.executeQuery()) {
280 MessageAction a = new MessageAction();
281 a.actionId = rs.getLong("message_action_id");
282 a.action = MessageActionValue.valueOf(rs.getString("action"));
283 a.actionStatus = ActionStatus.valueOf(rs.getString("action_status"));
284 a.timestamp = rs.getTimestamp("action_timestamp");
285 a.doneTimestamp = rs.getTimestamp("done_timestamp");
286 a.holdTime = rs.getInt("hold_time");
287 a.returnResponse = new MessageData();
288 a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param"));
289 a.returnResponse.body = rs.getString("response_body");
290 if (a.returnResponse.param == null && a.returnResponse.body == null) {
291 a.returnResponse = null;
293 a.resolution = rs.getString("resolution");
299 } catch (SQLException e) {
300 throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
304 public void setDataSource(DataSource dataSource) {
305 this.dataSource = dataSource;