[AAF-21] Updated Copyright Headers for AAF
[aaf/authz.git] / authz-cass / src / main / java / com / att / dao / CassDAOImpl.java
1 /*******************************************************************************\r
2  * ============LICENSE_START====================================================\r
3  * * org.onap.aaf\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 package com.att.dao;\r
24 \r
25 import java.io.ByteArrayInputStream;\r
26 import java.io.DataInputStream;\r
27 import java.lang.reflect.Field;\r
28 import java.nio.ByteBuffer;\r
29 import java.util.List;\r
30 \r
31 import com.att.authz.env.AuthzTrans;\r
32 import com.att.authz.layer.Result;\r
33 import com.att.dao.aaf.cass.Status;\r
34 import com.att.inno.env.TransStore;\r
35 import com.datastax.driver.core.Cluster;\r
36 import com.datastax.driver.core.ConsistencyLevel;\r
37 import com.datastax.driver.core.ResultSet;\r
38 import com.datastax.driver.core.ResultSetFuture;\r
39 \r
40 /**\r
41  * AbsCassDAO\r
42  *\r
43  * Deal with the essentials of Interaction with Cassandra DataStore for all Cassandra DAOs\r
44  *\r
45  *\r
46  * @param <DATA>\r
47  */\r
48 public class CassDAOImpl<TRANS extends TransStore,DATA> extends AbsCassDAO<TRANS, DATA> implements DAO<TRANS,DATA> {\r
49         public static final String USER_NAME = "__USER_NAME__";\r
50         protected static final String CREATE_SP = "CREATE ";\r
51         protected static final String UPDATE_SP = "UPDATE ";\r
52         protected static final String DELETE_SP = "DELETE ";\r
53         protected static final String SELECT_SP = "SELECT ";\r
54 \r
55         protected final String C_TEXT = getClass().getSimpleName() + " CREATE";\r
56         protected final String R_TEXT = getClass().getSimpleName() + " READ";\r
57         protected final String U_TEXT = getClass().getSimpleName() + " UPDATE";\r
58         protected final String D_TEXT = getClass().getSimpleName() + " DELETE";\r
59         private String table;\r
60         \r
61         protected final ConsistencyLevel readConsistency,writeConsistency;\r
62         \r
63         // Setteable only by CachedDAO\r
64         protected Cached<?, ?> cache;\r
65 \r
66         /**\r
67          * A Constructor from the originating Cluster.  This DAO will open the Session at need,\r
68          * and shutdown the session when "close()" is called.\r
69          *\r
70          * @param cluster\r
71          * @param keyspace\r
72          * @param dataClass\r
73          */\r
74         public CassDAOImpl(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {\r
75                 super(trans, name, cluster,keyspace,dataClass);\r
76                 this.table = table;\r
77                 readConsistency = read;\r
78                 writeConsistency = write;\r
79         }\r
80         \r
81         /**\r
82          * A Constructor to share Session with other DAOs.\r
83          *\r
84          * This method get the Session and Cluster information from the calling DAO, and won't\r
85          * touch the Session on closure.\r
86          *\r
87          * @param aDao\r
88          * @param dataClass\r
89          */\r
90         public CassDAOImpl(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {\r
91                 super(trans, name, aDao,dataClass);\r
92                 this.table = table;\r
93                 readConsistency = read;\r
94                 writeConsistency = write;\r
95         }\r
96 \r
97         protected PSInfo createPS;\r
98         protected PSInfo readPS;\r
99         protected PSInfo updatePS;\r
100         protected PSInfo deletePS;\r
101         private boolean async=false;\r
102 \r
103         public void async(boolean bool) {\r
104                 async = bool;\r
105         }\r
106 \r
107         public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader) {\r
108                 return setCRUD(trans, table, dc, loader, -1);\r
109         }\r
110         \r
111         public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader, int max) {\r
112                                 Field[] fields = dc.getDeclaredFields();\r
113                                 int end = max>=0 & max<fields.length?max:fields.length;\r
114                                 // get keylimit from a non-null Loader\r
115                                 int keylimit = loader.keylimit();\r
116                         \r
117                                 StringBuilder sbfc = new StringBuilder();\r
118                                 StringBuilder sbq = new StringBuilder();\r
119                                 StringBuilder sbwc = new StringBuilder();\r
120                                 StringBuilder sbup = new StringBuilder();\r
121                         \r
122                                 if(keylimit>0) {\r
123                                         for(int i=0;i<end;++i) {\r
124                                                 if(i>0) {\r
125                                                         sbfc.append(',');\r
126                                                         sbq.append(',');\r
127                                                         if(i<keylimit) {\r
128                                                                 sbwc.append(" AND ");\r
129                                                         }\r
130                                                 }\r
131                                                 sbfc.append(fields[i].getName());\r
132                                                 sbq.append('?');\r
133                                                 if(i>=keylimit) {\r
134                                                         if(i>keylimit) {\r
135                                                                 sbup.append(',');\r
136                                                         }\r
137                                                         sbup.append(fields[i].getName());\r
138                                                         sbup.append("=?");\r
139                                                 }\r
140                                                 if(i<keylimit) {\r
141                                                         sbwc.append(fields[i].getName());\r
142                                                         sbwc.append("=?");\r
143                                                 }\r
144                                         }\r
145                         \r
146                                         createPS = new PSInfo(trans, "INSERT INTO " + table + " ("+ sbfc +") VALUES ("+ sbq +");",loader,writeConsistency);\r
147                         \r
148                                         readPS = new PSInfo(trans, "SELECT " + sbfc + " FROM " + table + " WHERE " + sbwc + ';',loader,readConsistency);\r
149                         \r
150                                         // Note: UPDATES can't compile if there are no fields besides keys... Use "Insert"\r
151                                         if(sbup.length()==0) {\r
152                                                 updatePS = createPS; // the same as an insert\r
153                                         } else {\r
154                                                 updatePS = new PSInfo(trans, "UPDATE " + table + " SET " + sbup + " WHERE " + sbwc + ';',loader,writeConsistency);\r
155                                         }\r
156                         \r
157                                         deletePS = new PSInfo(trans, "DELETE FROM " + table + " WHERE " + sbwc + ';',loader,writeConsistency);\r
158                                 }\r
159                                 return new String[] {sbfc.toString(), sbq.toString(), sbup.toString(), sbwc.toString()};\r
160                         }\r
161 \r
162         public void replace(CRUD crud, PSInfo psInfo) {\r
163                 switch(crud) {\r
164                         case create: createPS = psInfo; break;\r
165                         case read:   readPS = psInfo; break;\r
166                         case update: updatePS = psInfo; break;\r
167                         case delete: deletePS = psInfo; break;\r
168                 }\r
169         }\r
170 \r
171         public void disable(CRUD crud) {\r
172                 switch(crud) {\r
173                         case create: createPS = null; break;\r
174                         case read:   readPS = null; break;\r
175                         case update: updatePS = null; break;\r
176                         case delete: deletePS = null; break;\r
177                 }\r
178         }\r
179 \r
180         \r
181         /**\r
182          * Given a DATA object, extract the individual elements from the Data into an Object Array for the\r
183          * execute element.\r
184          */\r
185         public Result<DATA> create(TRANS trans, DATA data)  {\r
186                 if(createPS==null) {\r
187                         Result.err(Result.ERR_NotImplemented,"Create is disabled for %s",getClass().getSimpleName());\r
188                 }\r
189                 if(async) /*ResultSetFuture */ {\r
190                         Result<ResultSetFuture> rs = createPS.execAsync(trans, C_TEXT, data);\r
191                         if(rs.notOK()) {\r
192                                 return Result.err(rs);\r
193                         }\r
194                 } else {\r
195                         Result<ResultSet> rs = createPS.exec(trans, C_TEXT, data);\r
196                         if(rs.notOK()) {\r
197                                 return Result.err(rs);\r
198                         }\r
199                 }\r
200                 wasModified(trans, CRUD.create, data);\r
201                 return Result.ok(data);\r
202         }\r
203 \r
204         /**\r
205          * Read the Unique Row associated with Full Keys\r
206          */\r
207         public Result<List<DATA>> read(TRANS trans, DATA data) {\r
208                 if(readPS==null) {\r
209                         Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());\r
210                 }\r
211                 return readPS.read(trans, R_TEXT, data);\r
212         }\r
213 \r
214         public Result<List<DATA>> read(TRANS trans, Object ... key) {\r
215                 if(readPS==null) {\r
216                         Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());\r
217                 }\r
218                 return readPS.read(trans, R_TEXT, key);\r
219         }\r
220 \r
221         public Result<Void> update(TRANS trans, DATA data) {\r
222                 if(updatePS==null) {\r
223                         Result.err(Result.ERR_NotImplemented,"Update is disabled for %s",getClass().getSimpleName());\r
224                 }\r
225                 if(async)/* ResultSet rs =*/ {\r
226                         Result<ResultSetFuture> rs = updatePS.execAsync(trans, U_TEXT, data);\r
227                         if(rs.notOK()) {\r
228                                 return Result.err(rs);\r
229                         }\r
230                 } else {\r
231                         Result<ResultSet> rs = updatePS.exec(trans, U_TEXT, data);\r
232                         if(rs.notOK()) {\r
233                                 return Result.err(rs);\r
234                         }\r
235                 }\r
236                 \r
237                 wasModified(trans, CRUD.update, data);\r
238                 return Result.ok();\r
239         }\r
240 \r
241         // This method Sig for Cached...\r
242         public Result<Void> delete(TRANS trans, DATA data, boolean reread) {\r
243                 if(deletePS==null) {\r
244                         Result.err(Result.ERR_NotImplemented,"Delete is disabled for %s",getClass().getSimpleName());\r
245                 }\r
246                 // Since Deleting will be stored off, for possible re-constitution, need the whole thing\r
247                 if(reread) {\r
248                         Result<List<DATA>> rd = read(trans,data);\r
249                         if(rd.notOK()) {\r
250                                 return Result.err(rd);\r
251                         }\r
252                         if(rd.isEmpty()) {\r
253                                 return Result.err(Status.ERR_NotFound,"Not Found");\r
254                         }\r
255                         for(DATA d : rd.value) { \r
256                                 if(async) {\r
257                                         Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, d);\r
258                                         if(rs.notOK()) {\r
259                                                 return Result.err(rs);\r
260                                         }\r
261                                 } else {\r
262                                         Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, d);\r
263                                         if(rs.notOK()) {\r
264                                                 return Result.err(rs);\r
265                                         }\r
266                                 }\r
267                                 wasModified(trans, CRUD.delete, d);\r
268                         }\r
269                 } else {\r
270                         if(async)/* ResultSet rs =*/ {\r
271                                 Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, data);\r
272                                 if(rs.notOK()) {\r
273                                         return Result.err(rs);\r
274                                 }\r
275                         } else {\r
276                                 Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, data);\r
277                                 if(rs.notOK()) {\r
278                                         return Result.err(rs);\r
279                                 }\r
280                         }\r
281                         wasModified(trans, CRUD.delete, data);\r
282                 }\r
283                 return Result.ok();\r
284         }\r
285         \r
286         public final Object[] keyFrom(DATA data) {\r
287                 return createPS.keyFrom(data);\r
288         }\r
289 \r
290         @Override\r
291         public String table() {\r
292                 return table;\r
293         }\r
294         \r
295         public static final String CASS_READ_CONSISTENCY="cassandra.readConsistency";\r
296         public static final String CASS_WRITE_CONSISTENCY="cassandra.writeConsistency";\r
297         protected static ConsistencyLevel readConsistency(AuthzTrans trans, String table) {\r
298                 String prop = trans.getProperty(CASS_READ_CONSISTENCY+'.'+table);\r
299                 if(prop==null) {\r
300                         prop = trans.getProperty(CASS_READ_CONSISTENCY);\r
301                         if(prop==null) {\r
302                                 return ConsistencyLevel.ONE; // this is Cassandra Default\r
303                         }\r
304                 }\r
305                 return ConsistencyLevel.valueOf(prop);\r
306         }\r
307 \r
308         protected static ConsistencyLevel writeConsistency(AuthzTrans trans, String table) {\r
309                 String prop = trans.getProperty(CASS_WRITE_CONSISTENCY+'.'+table);\r
310                 if(prop==null) {\r
311                         prop = trans.getProperty(CASS_WRITE_CONSISTENCY);\r
312                         if(prop==null) {\r
313                                 return ConsistencyLevel.ONE; // this is Cassandra Default\\r
314                         }\r
315                 }\r
316                 return ConsistencyLevel.valueOf(prop);\r
317         }\r
318 \r
319         public static DataInputStream toDIS(ByteBuffer bb) {\r
320                 byte[] b = bb.array();\r
321                 return new DataInputStream(\r
322                         new ByteArrayInputStream(b,bb.position(),bb.limit())\r
323                 );\r
324         }\r
325 \r
326 \r
327 }\r