68ec2e8b70356c0c0171d34e612baeb66007ff0c
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / CassDAOImpl.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao;
23
24 import java.io.ByteArrayInputStream;
25 import java.io.DataInputStream;
26 import java.lang.reflect.Field;
27 import java.nio.ByteBuffer;
28 import java.util.List;
29
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.env.AuthzTrans;
32 import org.onap.aaf.auth.layer.Result;
33 import org.onap.aaf.misc.env.TransStore;
34
35 import com.datastax.driver.core.Cluster;
36 import com.datastax.driver.core.ConsistencyLevel;
37 import com.datastax.driver.core.ResultSet;
38 import com.datastax.driver.core.ResultSetFuture;
39
40 /**
41  * AbsCassDAO
42  *
43  * Deal with the essentials of Interaction with Cassandra DataStore for all Cassandra DAOs
44  *
45  * @author Jonathan
46  *
47  * @param <DATA>
48  */
49 public class CassDAOImpl<TRANS extends TransStore,DATA> extends AbsCassDAO<TRANS, DATA> implements DAO<TRANS,DATA> {
50     public static final String USER_NAME = "__USER_NAME__";
51     public static final String CASS_READ_CONSISTENCY="cassandra.readConsistency";
52     public static final String CASS_WRITE_CONSISTENCY="cassandra.writeConsistency";
53     protected static final String CREATE_SP = "CREATE ";
54     protected static final String UPDATE_SP = "UPDATE ";
55     protected static final String DELETE_SP = "DELETE ";
56     protected static final String SELECT_SP = "SELECT ";
57     private static final String WHERE = " WHERE ";
58     private static final String READ_IS_DISABLED = "Read is disabled for %s";
59
60     protected final String C_TEXT = getClass().getSimpleName() + " CREATE";
61     protected final String R_TEXT = getClass().getSimpleName() + " READ";
62     protected final String U_TEXT = getClass().getSimpleName() + " UPDATE";
63     protected final String D_TEXT = getClass().getSimpleName() + " DELETE";
64     private String table;
65     
66     protected final ConsistencyLevel readConsistency;
67     protected final ConsistencyLevel writeConsistency;
68
69     protected PSInfo createPS;
70     protected PSInfo readPS;
71     protected PSInfo updatePS;
72     protected PSInfo deletePS;
73     protected boolean async=false;
74     
75     // Setteable only by CachedDAO
76     protected Cached<?, ?> cache;
77
78     /**
79      * A Constructor from the originating Cluster.  This DAO will open the Session at need,
80      * and shutdown the session when "close()" is called.
81      *
82      * @param cluster
83      * @param keyspace
84      * @param dataClass
85      */
86     public CassDAOImpl(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {
87         super(trans, name, cluster,keyspace,dataClass);
88         this.table = table;
89         readConsistency = read;
90         writeConsistency = write;
91     }
92     
93     /**
94      * A Constructor to share Session with other DAOs.
95      *
96      * This method get the Session and Cluster information from the calling DAO, and won't
97      * touch the Session on closure.
98      *
99      * @param aDao
100      * @param dataClass
101      */
102     public CassDAOImpl(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {
103         super(trans, name, aDao,dataClass);
104         this.table = table;
105         readConsistency = read;
106         writeConsistency = write;
107     }
108
109     public void async(boolean bool) {
110         async = bool;
111     }
112
113     public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader) {
114         return setCRUD(trans, table, dc, loader, -1);
115     }
116     
117     public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader, int max) {
118                 Field[] fields = dc.getDeclaredFields();
119                 int end = max>=0 && max<fields.length?max:fields.length;
120                 // get keylimit from a non-null Loader
121                 int keylimit = loader.keylimit();
122             
123                 StringBuilder sbfc = new StringBuilder();
124                 StringBuilder sbq = new StringBuilder();
125                 StringBuilder sbwc = new StringBuilder();
126                 StringBuilder sbup = new StringBuilder();
127             
128                 if (keylimit>0) {
129                     for (int i=0;i<end;++i) {
130                         if (i>0) {
131                             sbfc.append(',');
132                             sbq.append(',');
133                             if (i<keylimit) {
134                                 sbwc.append(" AND ");
135                             }
136                         }
137                         sbfc.append(fields[i].getName());
138                         sbq.append('?');
139                         if (i>=keylimit) {
140                             if (i>keylimit) {
141                                 sbup.append(',');
142                             }
143                             sbup.append(fields[i].getName());
144                             sbup.append("=?");
145                         }
146                         if (i<keylimit) {
147                             sbwc.append(fields[i].getName());
148                             sbwc.append("=?");
149                         }
150                     }
151             
152                     createPS = new PSInfo(trans, "INSERT INTO " + table + " ("+ sbfc +") VALUES ("+ sbq +");",loader,writeConsistency);
153             
154                     readPS = new PSInfo(trans, SELECT_SP + sbfc + " FROM " + table + WHERE + sbwc + ';',loader,readConsistency);
155             
156                     // Note: UPDATES can't compile if there are no fields besides keys... Use "Insert"
157                     if (sbup.length()==0) {
158                         updatePS = createPS; // the same as an insert
159                     } else {
160                         updatePS = new PSInfo(trans, UPDATE_SP + table + " SET " + sbup + WHERE + sbwc + ';',loader,writeConsistency);
161                     }
162             
163                     deletePS = new PSInfo(trans, "DELETE FROM " + table + WHERE + sbwc + ';',loader,writeConsistency);
164                 }
165                 return new String[] {sbfc.toString(), sbq.toString(), sbup.toString(), sbwc.toString()};
166             }
167
168     public void replace(CRUD crud, PSInfo psInfo) {
169         switch(crud) {
170             case create: createPS = psInfo;
171             break;
172             case read:   readPS = psInfo;
173             break;
174             case update: updatePS = psInfo;
175             break;
176             case delete: deletePS = psInfo;
177             break;
178         }
179     }
180
181     public void disable(CRUD crud) {
182         switch(crud) {
183             case create: createPS = null;
184             break;
185             case read:   readPS = null;
186             break;
187             case update: updatePS = null;
188             break;
189             case delete: deletePS = null;
190             break;
191         }
192     }
193
194     
195     /**
196      * Given a DATA object, extract the individual elements from the Data into an Object Array for the
197      * execute element.
198      */
199     public Result<DATA> create(TRANS trans, DATA data)  {
200         if (createPS==null) {
201             return Result.err(Result.ERR_NotImplemented,"Create is disabled for %s",getClass().getSimpleName());
202         }
203         if (async) /*ResultSetFuture */ {
204             Result<ResultSetFuture> rs = createPS.execAsync(trans, C_TEXT, data);
205             if (rs.notOK()) {
206                 return Result.err(rs);
207             }
208         } else {
209             Result<ResultSet> rs = createPS.exec(trans, C_TEXT, data);
210             if (rs.notOK()) {
211                 return Result.err(rs);
212             }
213         }
214         wasModified(trans, CRUD.create, data);
215         return Result.ok(data);
216     }
217
218     /**
219      * Read the Unique Row associated with Full Keys
220      */
221     public Result<List<DATA>> read(TRANS trans, DATA data) {
222         if (readPS==null) {
223             return Result.err(Result.ERR_NotImplemented,READ_IS_DISABLED,getClass().getSimpleName());
224         }
225         return readPS.read(trans, R_TEXT, data);
226     }
227
228     public Result<List<DATA>> read(TRANS trans, Object ... key) {
229         if (readPS==null) {
230             return Result.err(Result.ERR_NotImplemented,READ_IS_DISABLED,getClass().getSimpleName());
231         }
232         return readPS.read(trans, R_TEXT, key);
233     }
234     
235     public Result<DATA> readPrimKey(TRANS trans, Object ... key) {
236         if (readPS==null) {
237             return Result.err(Result.ERR_NotImplemented,READ_IS_DISABLED,getClass().getSimpleName());
238         }
239         Result<List<DATA>> rld = readPS.read(trans, R_TEXT, key);
240         if (rld.isOK()) {
241             if (rld.isEmpty()) {
242                 return Result.err(Result.ERR_NotFound,rld.details);
243             } else {
244                 return Result.ok(rld.value.get(0));
245             }
246         } else {
247             return Result.err(rld);
248         }
249     }
250
251     public Result<Void> update(TRANS trans, DATA data) {
252         return update(trans, data, async);
253     }
254
255     public Result<Void> update(TRANS trans, DATA data, boolean async) {
256         if (updatePS==null) {
257             return Result.err(Result.ERR_NotImplemented,"Update is disabled for %s",getClass().getSimpleName());
258         }
259         if (async)/* ResultSet rs =*/ {
260             Result<ResultSetFuture> rs = updatePS.execAsync(trans, U_TEXT, data);
261             if (rs.notOK()) {
262                 return Result.err(rs);
263             }
264         } else {
265             Result<ResultSet> rs = updatePS.exec(trans, U_TEXT, data);
266             if (rs.notOK()) {
267                 return Result.err(rs);
268             }
269         }
270         
271         wasModified(trans, CRUD.update, data);
272         return Result.ok();
273     }
274
275     // This method Sig for Cached...
276     public Result<Void> delete(TRANS trans, DATA data, boolean reread) {
277         if (deletePS==null) {
278             return Result.err(Result.ERR_NotImplemented,"Delete is disabled for %s",getClass().getSimpleName());
279         }
280         // Since Deleting will be stored off, for possible re-constitution, need the whole thing
281         if (reread) {
282             Result<List<DATA>> rd = read(trans,data);
283             if (rd.notOK()) {
284                 return Result.err(rd);
285             }
286             if (rd.isEmpty()) {
287                 return Result.err(Status.ERR_NotFound,"Not Found");
288             }
289             for (DATA d : rd.value) { 
290                 if (async) {
291                     Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, d);
292                     if (rs.notOK()) {
293                         return Result.err(rs);
294                     }
295                 } else {
296                     Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, d);
297                     if (rs.notOK()) {
298                         return Result.err(rs);
299                     }
300                 }
301                 wasModified(trans, CRUD.delete, d);
302             }
303         } else {
304             if (async)/* ResultSet rs =*/ {
305                 Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, data);
306                 if (rs.notOK()) {
307                     return Result.err(rs);
308                 }
309             } else {
310                 Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, data);
311                 if (rs.notOK()) {
312                     return Result.err(rs);
313                 }
314             }
315             wasModified(trans, CRUD.delete, data);
316         }
317         return Result.ok();
318     }
319     
320     public final Object[] keyFrom(DATA data) {
321         return createPS.keyFrom(data);
322     }
323
324     @Override
325     public String table() {
326         return table;
327     }
328
329     protected static ConsistencyLevel readConsistency(AuthzTrans trans, String table) {
330         String prop = trans.getProperty(CASS_READ_CONSISTENCY+'.'+table);
331         if (prop==null) {
332             prop = trans.getProperty(CASS_READ_CONSISTENCY);
333             if (prop==null) {
334                 return ConsistencyLevel.ONE; // this is Cassandra Default
335             }
336         }
337         return ConsistencyLevel.valueOf(prop);
338     }
339
340     protected static ConsistencyLevel writeConsistency(AuthzTrans trans, String table) {
341         String prop = trans.getProperty(CASS_WRITE_CONSISTENCY+'.'+table);
342         if (prop==null) {
343             prop = trans.getProperty(CASS_WRITE_CONSISTENCY);
344             if (prop==null) {
345                 return ConsistencyLevel.ONE; // this is Cassandra Default\
346             }
347         }
348         return ConsistencyLevel.valueOf(prop);
349     }
350
351     public static DataInputStream toDIS(ByteBuffer bb) {
352         byte[] b = bb.array();
353         return new DataInputStream(
354             new ByteArrayInputStream(b,bb.position(),bb.limit())
355         );
356     }
357
358
359 }