AT&T 2.0.19 Code drop, stage 4
[aaf/authz.git] / authz-cass / src / main / java / org / onap / aaf / dao / CassDAOImpl.java
1 /*******************************************************************************\r
2  * ============LICENSE_START====================================================\r
3  * * org.onap.aaf\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 package org.onap.aaf.dao;\r
24 \r
25 import java.io.ByteArrayInputStream;\r
26 import java.io.DataInputStream;\r
27 import java.lang.reflect.Field;\r
28 import java.nio.ByteBuffer;\r
29 import java.util.List;\r
30 \r
31 import org.onap.aaf.authz.env.AuthzTrans;\r
32 import org.onap.aaf.authz.layer.Result;\r
33 import org.onap.aaf.dao.aaf.cass.Status;\r
34 \r
35 import org.onap.aaf.inno.env.TransStore;\r
36 import com.datastax.driver.core.Cluster;\r
37 import com.datastax.driver.core.ConsistencyLevel;\r
38 import com.datastax.driver.core.ResultSet;\r
39 import com.datastax.driver.core.ResultSetFuture;\r
40 \r
41 /**\r
42  * AbsCassDAO\r
43  *\r
44  * Deal with the essentials of Interaction with Cassandra DataStore for all Cassandra DAOs\r
45  *\r
46  *\r
47  * @param <DATA>\r
48  */\r
49 public class CassDAOImpl<TRANS extends TransStore,DATA> extends AbsCassDAO<TRANS, DATA> implements DAO<TRANS,DATA> {\r
50         public static final String USER_NAME = "__USER_NAME__";\r
51         protected static final String CREATE_SP = "CREATE ";\r
52         protected static final String UPDATE_SP = "UPDATE ";\r
53         protected static final String DELETE_SP = "DELETE ";\r
54         protected static final String SELECT_SP = "SELECT ";\r
55 \r
56         protected final String C_TEXT = getClass().getSimpleName() + " CREATE";\r
57         protected final String R_TEXT = getClass().getSimpleName() + " READ";\r
58         protected final String U_TEXT = getClass().getSimpleName() + " UPDATE";\r
59         protected final String D_TEXT = getClass().getSimpleName() + " DELETE";\r
60         private String table;\r
61         \r
62         protected final ConsistencyLevel readConsistency,writeConsistency;\r
63         \r
64         // Setteable only by CachedDAO\r
65         protected Cached<?, ?> cache;\r
66 \r
67         /**\r
68          * A Constructor from the originating Cluster.  This DAO will open the Session at need,\r
69          * and shutdown the session when "close()" is called.\r
70          *\r
71          * @param cluster\r
72          * @param keyspace\r
73          * @param dataClass\r
74          */\r
75         public CassDAOImpl(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {\r
76                 super(trans, name, cluster,keyspace,dataClass);\r
77                 this.table = table;\r
78                 readConsistency = read;\r
79                 writeConsistency = write;\r
80         }\r
81         \r
82         /**\r
83          * A Constructor to share Session with other DAOs.\r
84          *\r
85          * This method get the Session and Cluster information from the calling DAO, and won't\r
86          * touch the Session on closure.\r
87          *\r
88          * @param aDao\r
89          * @param dataClass\r
90          */\r
91         public CassDAOImpl(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {\r
92                 super(trans, name, aDao,dataClass);\r
93                 this.table = table;\r
94                 readConsistency = read;\r
95                 writeConsistency = write;\r
96         }\r
97 \r
98         protected PSInfo createPS;\r
99         protected PSInfo readPS;\r
100         protected PSInfo updatePS;\r
101         protected PSInfo deletePS;\r
102         private boolean async=false;\r
103 \r
104         public void async(boolean bool) {\r
105                 async = bool;\r
106         }\r
107 \r
108         public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader) {\r
109                 return setCRUD(trans, table, dc, loader, -1);\r
110         }\r
111         \r
112         public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader, int max) {\r
113                                 Field[] fields = dc.getDeclaredFields();\r
114                                 int end = max>=0 & max<fields.length?max:fields.length;\r
115                                 // get keylimit from a non-null Loader\r
116                                 int keylimit = loader.keylimit();\r
117                         \r
118                                 StringBuilder sbfc = new StringBuilder();\r
119                                 StringBuilder sbq = new StringBuilder();\r
120                                 StringBuilder sbwc = new StringBuilder();\r
121                                 StringBuilder sbup = new StringBuilder();\r
122                         \r
123                                 if(keylimit>0) {\r
124                                         for(int i=0;i<end;++i) {\r
125                                                 if(i>0) {\r
126                                                         sbfc.append(',');\r
127                                                         sbq.append(',');\r
128                                                         if(i<keylimit) {\r
129                                                                 sbwc.append(" AND ");\r
130                                                         }\r
131                                                 }\r
132                                                 sbfc.append(fields[i].getName());\r
133                                                 sbq.append('?');\r
134                                                 if(i>=keylimit) {\r
135                                                         if(i>keylimit) {\r
136                                                                 sbup.append(',');\r
137                                                         }\r
138                                                         sbup.append(fields[i].getName());\r
139                                                         sbup.append("=?");\r
140                                                 }\r
141                                                 if(i<keylimit) {\r
142                                                         sbwc.append(fields[i].getName());\r
143                                                         sbwc.append("=?");\r
144                                                 }\r
145                                         }\r
146                         \r
147                                         createPS = new PSInfo(trans, "INSERT INTO " + table + " ("+ sbfc +") VALUES ("+ sbq +");",loader,writeConsistency);\r
148                         \r
149                                         readPS = new PSInfo(trans, "SELECT " + sbfc + " FROM " + table + " WHERE " + sbwc + ';',loader,readConsistency);\r
150                         \r
151                                         // Note: UPDATES can't compile if there are no fields besides keys... Use "Insert"\r
152                                         if(sbup.length()==0) {\r
153                                                 updatePS = createPS; // the same as an insert\r
154                                         } else {\r
155                                                 updatePS = new PSInfo(trans, "UPDATE " + table + " SET " + sbup + " WHERE " + sbwc + ';',loader,writeConsistency);\r
156                                         }\r
157                         \r
158                                         deletePS = new PSInfo(trans, "DELETE FROM " + table + " WHERE " + sbwc + ';',loader,writeConsistency);\r
159                                 }\r
160                                 return new String[] {sbfc.toString(), sbq.toString(), sbup.toString(), sbwc.toString()};\r
161                         }\r
162 \r
163         public void replace(CRUD crud, PSInfo psInfo) {\r
164                 switch(crud) {\r
165                         case create: createPS = psInfo; break;\r
166                         case read:   readPS = psInfo; break;\r
167                         case update: updatePS = psInfo; break;\r
168                         case delete: deletePS = psInfo; break;\r
169                 }\r
170         }\r
171 \r
172         public void disable(CRUD crud) {\r
173                 switch(crud) {\r
174                         case create: createPS = null; break;\r
175                         case read:   readPS = null; break;\r
176                         case update: updatePS = null; break;\r
177                         case delete: deletePS = null; break;\r
178                 }\r
179         }\r
180 \r
181         \r
182         /**\r
183          * Given a DATA object, extract the individual elements from the Data into an Object Array for the\r
184          * execute element.\r
185          */\r
186         public Result<DATA> create(TRANS trans, DATA data)  {\r
187                 if(createPS==null) {\r
188                         Result.err(Result.ERR_NotImplemented,"Create is disabled for %s",getClass().getSimpleName());\r
189                 }\r
190                 if(async) /*ResultSetFuture */ {\r
191                         Result<ResultSetFuture> rs = createPS.execAsync(trans, C_TEXT, data);\r
192                         if(rs.notOK()) {\r
193                                 return Result.err(rs);\r
194                         }\r
195                 } else {\r
196                         Result<ResultSet> rs = createPS.exec(trans, C_TEXT, data);\r
197                         if(rs.notOK()) {\r
198                                 return Result.err(rs);\r
199                         }\r
200                 }\r
201                 wasModified(trans, CRUD.create, data);\r
202                 return Result.ok(data);\r
203         }\r
204 \r
205         /**\r
206          * Read the Unique Row associated with Full Keys\r
207          */\r
208         public Result<List<DATA>> read(TRANS trans, DATA data) {\r
209                 if(readPS==null) {\r
210                         Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());\r
211                 }\r
212                 return readPS.read(trans, R_TEXT, data);\r
213         }\r
214 \r
215         public Result<List<DATA>> read(TRANS trans, Object ... key) {\r
216                 if(readPS==null) {\r
217                         Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());\r
218                 }\r
219                 return readPS.read(trans, R_TEXT, key);\r
220         }\r
221 \r
222         public Result<Void> update(TRANS trans, DATA data) {\r
223                 if(updatePS==null) {\r
224                         Result.err(Result.ERR_NotImplemented,"Update is disabled for %s",getClass().getSimpleName());\r
225                 }\r
226                 if(async)/* ResultSet rs =*/ {\r
227                         Result<ResultSetFuture> rs = updatePS.execAsync(trans, U_TEXT, data);\r
228                         if(rs.notOK()) {\r
229                                 return Result.err(rs);\r
230                         }\r
231                 } else {\r
232                         Result<ResultSet> rs = updatePS.exec(trans, U_TEXT, data);\r
233                         if(rs.notOK()) {\r
234                                 return Result.err(rs);\r
235                         }\r
236                 }\r
237                 \r
238                 wasModified(trans, CRUD.update, data);\r
239                 return Result.ok();\r
240         }\r
241 \r
242         // This method Sig for Cached...\r
243         public Result<Void> delete(TRANS trans, DATA data, boolean reread) {\r
244                 if(deletePS==null) {\r
245                         Result.err(Result.ERR_NotImplemented,"Delete is disabled for %s",getClass().getSimpleName());\r
246                 }\r
247                 // Since Deleting will be stored off, for possible re-constitution, need the whole thing\r
248                 if(reread) {\r
249                         Result<List<DATA>> rd = read(trans,data);\r
250                         if(rd.notOK()) {\r
251                                 return Result.err(rd);\r
252                         }\r
253                         if(rd.isEmpty()) {\r
254                                 return Result.err(Status.ERR_NotFound,"Not Found");\r
255                         }\r
256                         for(DATA d : rd.value) { \r
257                                 if(async) {\r
258                                         Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, d);\r
259                                         if(rs.notOK()) {\r
260                                                 return Result.err(rs);\r
261                                         }\r
262                                 } else {\r
263                                         Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, d);\r
264                                         if(rs.notOK()) {\r
265                                                 return Result.err(rs);\r
266                                         }\r
267                                 }\r
268                                 wasModified(trans, CRUD.delete, d);\r
269                         }\r
270                 } else {\r
271                         if(async)/* ResultSet rs =*/ {\r
272                                 Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, data);\r
273                                 if(rs.notOK()) {\r
274                                         return Result.err(rs);\r
275                                 }\r
276                         } else {\r
277                                 Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, data);\r
278                                 if(rs.notOK()) {\r
279                                         return Result.err(rs);\r
280                                 }\r
281                         }\r
282                         wasModified(trans, CRUD.delete, data);\r
283                 }\r
284                 return Result.ok();\r
285         }\r
286         \r
287         public final Object[] keyFrom(DATA data) {\r
288                 return createPS.keyFrom(data);\r
289         }\r
290 \r
291         @Override\r
292         public String table() {\r
293                 return table;\r
294         }\r
295         \r
296         public static final String CASS_READ_CONSISTENCY="cassandra.readConsistency";\r
297         public static final String CASS_WRITE_CONSISTENCY="cassandra.writeConsistency";\r
298         protected static ConsistencyLevel readConsistency(AuthzTrans trans, String table) {\r
299                 String prop = trans.getProperty(CASS_READ_CONSISTENCY+'.'+table);\r
300                 if(prop==null) {\r
301                         prop = trans.getProperty(CASS_READ_CONSISTENCY);\r
302                         if(prop==null) {\r
303                                 return ConsistencyLevel.ONE; // this is Cassandra Default\r
304                         }\r
305                 }\r
306                 return ConsistencyLevel.valueOf(prop);\r
307         }\r
308 \r
309         protected static ConsistencyLevel writeConsistency(AuthzTrans trans, String table) {\r
310                 String prop = trans.getProperty(CASS_WRITE_CONSISTENCY+'.'+table);\r
311                 if(prop==null) {\r
312                         prop = trans.getProperty(CASS_WRITE_CONSISTENCY);\r
313                         if(prop==null) {\r
314                                 return ConsistencyLevel.ONE; // this is Cassandra Default\\r
315                         }\r
316                 }\r
317                 return ConsistencyLevel.valueOf(prop);\r
318         }\r
319 \r
320         public static DataInputStream toDIS(ByteBuffer bb) {\r
321                 byte[] b = bb.array();\r
322                 return new DataInputStream(\r
323                         new ByteArrayInputStream(b,bb.position(),bb.limit())\r
324                 );\r
325         }\r
326 \r
327 \r
328 }\r