AT&T 2.0.19 Code drop, stage 3
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / CassDAOImpl.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao;
23
24 import java.io.ByteArrayInputStream;
25 import java.io.DataInputStream;
26 import java.lang.reflect.Field;
27 import java.nio.ByteBuffer;
28 import java.util.List;
29
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.env.AuthzTrans;
32 import org.onap.aaf.auth.layer.Result;
33 import org.onap.aaf.misc.env.TransStore;
34
35 import com.datastax.driver.core.Cluster;
36 import com.datastax.driver.core.ConsistencyLevel;
37 import com.datastax.driver.core.ResultSet;
38 import com.datastax.driver.core.ResultSetFuture;
39
40 /**
41  * AbsCassDAO
42  *
43  * Deal with the essentials of Interaction with Cassandra DataStore for all Cassandra DAOs
44  *
45  * @author Jonathan
46  *
47  * @param <DATA>
48  */
49 public class CassDAOImpl<TRANS extends TransStore,DATA> extends AbsCassDAO<TRANS, DATA> implements DAO<TRANS,DATA> {
50         public static final String USER_NAME = "__USER_NAME__";
51         protected static final String CREATE_SP = "CREATE ";
52         protected static final String UPDATE_SP = "UPDATE ";
53         protected static final String DELETE_SP = "DELETE ";
54         protected static final String SELECT_SP = "SELECT ";
55
56         protected final String C_TEXT = getClass().getSimpleName() + " CREATE";
57         protected final String R_TEXT = getClass().getSimpleName() + " READ";
58         protected final String U_TEXT = getClass().getSimpleName() + " UPDATE";
59         protected final String D_TEXT = getClass().getSimpleName() + " DELETE";
60         private String table;
61         
62         protected final ConsistencyLevel readConsistency,writeConsistency;
63         
64         // Setteable only by CachedDAO
65         protected Cached<?, ?> cache;
66
67         /**
68          * A Constructor from the originating Cluster.  This DAO will open the Session at need,
69          * and shutdown the session when "close()" is called.
70          *
71          * @param cluster
72          * @param keyspace
73          * @param dataClass
74          */
75         public CassDAOImpl(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {
76                 super(trans, name, cluster,keyspace,dataClass);
77                 this.table = table;
78                 readConsistency = read;
79                 writeConsistency = write;
80         }
81         
82         /**
83          * A Constructor to share Session with other DAOs.
84          *
85          * This method get the Session and Cluster information from the calling DAO, and won't
86          * touch the Session on closure.
87          *
88          * @param aDao
89          * @param dataClass
90          */
91         public CassDAOImpl(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {
92                 super(trans, name, aDao,dataClass);
93                 this.table = table;
94                 readConsistency = read;
95                 writeConsistency = write;
96         }
97
98         protected PSInfo createPS;
99         protected PSInfo readPS;
100         protected PSInfo updatePS;
101         protected PSInfo deletePS;
102         protected boolean async=false;
103
104         public void async(boolean bool) {
105                 async = bool;
106         }
107
108         public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader) {
109                 return setCRUD(trans, table, dc, loader, -1);
110         }
111         
112         public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader, int max) {
113                                 Field[] fields = dc.getDeclaredFields();
114                                 int end = max>=0 & max<fields.length?max:fields.length;
115                                 // get keylimit from a non-null Loader
116                                 int keylimit = loader.keylimit();
117                         
118                                 StringBuilder sbfc = new StringBuilder();
119                                 StringBuilder sbq = new StringBuilder();
120                                 StringBuilder sbwc = new StringBuilder();
121                                 StringBuilder sbup = new StringBuilder();
122                         
123                                 if(keylimit>0) {
124                                         for(int i=0;i<end;++i) {
125                                                 if(i>0) {
126                                                         sbfc.append(',');
127                                                         sbq.append(',');
128                                                         if(i<keylimit) {
129                                                                 sbwc.append(" AND ");
130                                                         }
131                                                 }
132                                                 sbfc.append(fields[i].getName());
133                                                 sbq.append('?');
134                                                 if(i>=keylimit) {
135                                                         if(i>keylimit) {
136                                                                 sbup.append(',');
137                                                         }
138                                                         sbup.append(fields[i].getName());
139                                                         sbup.append("=?");
140                                                 }
141                                                 if(i<keylimit) {
142                                                         sbwc.append(fields[i].getName());
143                                                         sbwc.append("=?");
144                                                 }
145                                         }
146                         
147                                         createPS = new PSInfo(trans, "INSERT INTO " + table + " ("+ sbfc +") VALUES ("+ sbq +");",loader,writeConsistency);
148                         
149                                         readPS = new PSInfo(trans, "SELECT " + sbfc + " FROM " + table + " WHERE " + sbwc + ';',loader,readConsistency);
150                         
151                                         // Note: UPDATES can't compile if there are no fields besides keys... Use "Insert"
152                                         if(sbup.length()==0) {
153                                                 updatePS = createPS; // the same as an insert
154                                         } else {
155                                                 updatePS = new PSInfo(trans, "UPDATE " + table + " SET " + sbup + " WHERE " + sbwc + ';',loader,writeConsistency);
156                                         }
157                         
158                                         deletePS = new PSInfo(trans, "DELETE FROM " + table + " WHERE " + sbwc + ';',loader,writeConsistency);
159                                 }
160                                 return new String[] {sbfc.toString(), sbq.toString(), sbup.toString(), sbwc.toString()};
161                         }
162
163         public void replace(CRUD crud, PSInfo psInfo) {
164                 switch(crud) {
165                         case create: createPS = psInfo; break;
166                         case read:   readPS = psInfo; break;
167                         case update: updatePS = psInfo; break;
168                         case delete: deletePS = psInfo; break;
169                 }
170         }
171
172         public void disable(CRUD crud) {
173                 switch(crud) {
174                         case create: createPS = null; break;
175                         case read:   readPS = null; break;
176                         case update: updatePS = null; break;
177                         case delete: deletePS = null; break;
178                 }
179         }
180
181         
182         /**
183          * Given a DATA object, extract the individual elements from the Data into an Object Array for the
184          * execute element.
185          */
186         public Result<DATA> create(TRANS trans, DATA data)  {
187                 if(createPS==null) {
188                         Result.err(Result.ERR_NotImplemented,"Create is disabled for %s",getClass().getSimpleName());
189                 }
190                 if(async) /*ResultSetFuture */ {
191                         Result<ResultSetFuture> rs = createPS.execAsync(trans, C_TEXT, data);
192                         if(rs.notOK()) {
193                                 return Result.err(rs);
194                         }
195                 } else {
196                         Result<ResultSet> rs = createPS.exec(trans, C_TEXT, data);
197                         if(rs.notOK()) {
198                                 return Result.err(rs);
199                         }
200                 }
201                 wasModified(trans, CRUD.create, data);
202                 return Result.ok(data);
203         }
204
205         /**
206          * Read the Unique Row associated with Full Keys
207          */
208         public Result<List<DATA>> read(TRANS trans, DATA data) {
209                 if(readPS==null) {
210                         Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());
211                 }
212                 return readPS.read(trans, R_TEXT, data);
213         }
214
215         public Result<List<DATA>> read(TRANS trans, Object ... key) {
216                 if(readPS==null) {
217                         Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());
218                 }
219                 return readPS.read(trans, R_TEXT, key);
220         }
221         
222         public Result<DATA> readPrimKey(TRANS trans, Object ... key) {
223                 if(readPS==null) {
224                         Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());
225                 }
226                 Result<List<DATA>> rld = readPS.read(trans, R_TEXT, key);
227                 if(rld.isOK()) {
228                         if(rld.isEmpty()) {
229                                 return Result.err(Result.ERR_NotFound,rld.details);
230                         } else {
231                                 return Result.ok(rld.value.get(0));
232                         }
233                 } else {
234                         return Result.err(rld);
235                 }
236         }
237
238         public Result<Void> update(TRANS trans, DATA data) {
239                 return update(trans, data, async);
240         }
241
242         public Result<Void> update(TRANS trans, DATA data, boolean async) {
243                 if(updatePS==null) {
244                         Result.err(Result.ERR_NotImplemented,"Update is disabled for %s",getClass().getSimpleName());
245                 }
246                 if(async)/* ResultSet rs =*/ {
247                         Result<ResultSetFuture> rs = updatePS.execAsync(trans, U_TEXT, data);
248                         if(rs.notOK()) {
249                                 return Result.err(rs);
250                         }
251                 } else {
252                         Result<ResultSet> rs = updatePS.exec(trans, U_TEXT, data);
253                         if(rs.notOK()) {
254                                 return Result.err(rs);
255                         }
256                 }
257                 
258                 wasModified(trans, CRUD.update, data);
259                 return Result.ok();
260         }
261
262         // This method Sig for Cached...
263         public Result<Void> delete(TRANS trans, DATA data, boolean reread) {
264                 if(deletePS==null) {
265                         Result.err(Result.ERR_NotImplemented,"Delete is disabled for %s",getClass().getSimpleName());
266                 }
267                 // Since Deleting will be stored off, for possible re-constitution, need the whole thing
268                 if(reread) {
269                         Result<List<DATA>> rd = read(trans,data);
270                         if(rd.notOK()) {
271                                 return Result.err(rd);
272                         }
273                         if(rd.isEmpty()) {
274                                 return Result.err(Status.ERR_NotFound,"Not Found");
275                         }
276                         for(DATA d : rd.value) { 
277                                 if(async) {
278                                         Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, d);
279                                         if(rs.notOK()) {
280                                                 return Result.err(rs);
281                                         }
282                                 } else {
283                                         Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, d);
284                                         if(rs.notOK()) {
285                                                 return Result.err(rs);
286                                         }
287                                 }
288                                 wasModified(trans, CRUD.delete, d);
289                         }
290                 } else {
291                         if(async)/* ResultSet rs =*/ {
292                                 Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, data);
293                                 if(rs.notOK()) {
294                                         return Result.err(rs);
295                                 }
296                         } else {
297                                 Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, data);
298                                 if(rs.notOK()) {
299                                         return Result.err(rs);
300                                 }
301                         }
302                         wasModified(trans, CRUD.delete, data);
303                 }
304                 return Result.ok();
305         }
306         
307         public final Object[] keyFrom(DATA data) {
308                 return createPS.keyFrom(data);
309         }
310
311         @Override
312         public String table() {
313                 return table;
314         }
315         
316         public static final String CASS_READ_CONSISTENCY="cassandra.readConsistency";
317         public static final String CASS_WRITE_CONSISTENCY="cassandra.writeConsistency";
318         protected static ConsistencyLevel readConsistency(AuthzTrans trans, String table) {
319                 String prop = trans.getProperty(CASS_READ_CONSISTENCY+'.'+table);
320                 if(prop==null) {
321                         prop = trans.getProperty(CASS_READ_CONSISTENCY);
322                         if(prop==null) {
323                                 return ConsistencyLevel.ONE; // this is Cassandra Default
324                         }
325                 }
326                 return ConsistencyLevel.valueOf(prop);
327         }
328
329         protected static ConsistencyLevel writeConsistency(AuthzTrans trans, String table) {
330                 String prop = trans.getProperty(CASS_WRITE_CONSISTENCY+'.'+table);
331                 if(prop==null) {
332                         prop = trans.getProperty(CASS_WRITE_CONSISTENCY);
333                         if(prop==null) {
334                                 return ConsistencyLevel.ONE; // this is Cassandra Default\
335                         }
336                 }
337                 return ConsistencyLevel.valueOf(prop);
338         }
339
340         public static DataInputStream toDIS(ByteBuffer bb) {
341                 byte[] b = bb.array();
342                 return new DataInputStream(
343                         new ByteArrayInputStream(b,bb.position(),bb.limit())
344                 );
345         }
346
347
348 }