Merge "added test cases to JsonResponseTest.java"
[music.git] / musictrigger / src / MusicTrigger.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.nio.ByteBuffer;
30 import java.nio.charset.Charset;
31 import java.nio.charset.StandardCharsets;
32 import java.util.ArrayList;
33 import java.util.Map;
34
35 import javax.ws.rs.core.HttpHeaders;
36 import javax.ws.rs.core.MediaType;
37
38 import org.apache.cassandra.config.ColumnDefinition;
39 import org.apache.cassandra.db.Clustering;
40 import org.apache.cassandra.db.Mutation;
41 import org.apache.cassandra.db.partitions.Partition;
42 import org.apache.cassandra.db.rows.Cell;
43 import org.apache.cassandra.db.rows.Row;
44 import org.apache.cassandra.db.rows.Unfiltered;
45 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
46 import org.apache.cassandra.triggers.ITrigger;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import com.sun.jersey.api.client.Client;
51 import com.sun.jersey.api.client.ClientResponse;
52 import com.sun.jersey.api.client.WebResource;
53
54
55 public class MusicTrigger implements ITrigger {
56
57         private static final Logger logger = LoggerFactory.getLogger(MusicTrigger.class);
58         
59     public Collection<Mutation> augment(Partition partition)
60     {
61         new Thread(new Runnable() {
62             public void run() {
63                 makeAsyncCall(partition);
64             }
65         }).start();
66         return Collections.emptyList();
67     }
68     
69     private void makeAsyncCall(Partition partition) {
70         boolean isDelete = false;
71         if(partition.partitionLevelDeletion().isLive()) {
72                 
73         } else {
74             // Partition Level Deletion
75                 isDelete = true;
76         }
77         logger.info("MusicTrigger isDelete: " + isDelete);
78         String ksName = partition.metadata().ksName;
79         String tableName = partition.metadata().cfName;
80         String pkValue = partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey());
81         logger.info("MusicTrigger Table: " + tableName);
82         boolean isInsert = checkQueryType(partition);
83         org.json.simple.JSONObject obj = new org.json.simple.JSONObject();
84         
85         String operation = null;
86         if(isDelete)
87                 operation = "delete";
88         else if(isInsert)
89                 operation = "insert";
90         else
91                 operation = "update";
92         Map<String, Object> changeMap = new HashMap<>();
93         
94         obj.put("operation", operation);
95         obj.put("keyspace", ksName);
96         obj.put("table_name", tableName);
97         obj.put("full_table", ksName+"."+tableName);
98         obj.put("primary_key", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
99         List<String> updateList = new ArrayList<>();
100         //obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
101         if("update".equals(operation)) {
102                 try {
103                     UnfilteredRowIterator it = partition.unfilteredIterator();
104                     while (it.hasNext()) {
105                         Unfiltered un = it.next();
106                         Clustering clt = (Clustering) un.clustering();  
107                         Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
108                         Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
109         
110                         while(columns.hasNext()){
111                             ColumnDefinition columnDef = columns.next();
112                             Cell cell = cells.next();
113                             
114                             String data = null;
115                             if(cell.column().type.toString().equals("org.apache.cassandra.db.marshal.UTF8Type")) {
116                                 logger.info(">> type is String");
117                                 data = new String(cell.value().array()); // If cell type is text
118                             } else if(cell.column().type.toString().equals("org.apache.cassandra.db.marshal.Int32Type")) {
119                                 //ByteBuffer wrapped = ByteBuffer.wrap(cell.value()); // big-endian by default
120                                 int num = fromByteArray(cell.value().array());
121                                 logger.info(">> type is Integer1 :: "+num);
122                                 data = String.valueOf(num);
123                             }
124                             
125                             logger.info("Inside triggers loop: "+columnDef.name+" : "+data);
126                             //changeMap.put(ksName+"."+tableName+"."+columnDef.name,data);
127                             updateList.add(ksName+"."+tableName+":"+columnDef.name+":"+data);
128                             changeMap.put("field_value",ksName+"."+tableName+":"+columnDef.name+":"+data);
129                             
130                         }
131                     }
132                 } catch (Exception e) {
133                         logger.info("Exception while constructing.. "+e.getMessage());
134                 }
135                 obj.put("updateList", updateList);
136         } else {
137                 changeMap.put("field_value", ksName+"."+tableName);
138         }
139         
140         obj.put("changeValue", changeMap);
141         logger.info("Sending response: "+obj.toString());
142         try {
143             notifyMusic(obj.toString());
144             logger.info("MUSIC was notified.. "+obj.toString());
145         } catch(Exception e) {
146             logger.error("Notification failed..."+e.getMessage());
147             logger.info("Notification failed..."+e.getMessage());
148         }
149         
150     }
151     
152     private int fromByteArray(byte[] bytes) {
153         return bytes[0] << 24 | (bytes[1] & 0xFF) << 16 | (bytes[2] & 0xFF) << 8 | (bytes[3] & 0xFF);
154     }
155     
156     private boolean checkQueryType(Partition partition) { 
157         UnfilteredRowIterator it = partition.unfilteredIterator();
158         while (it.hasNext()) {
159             Unfiltered unfiltered = it.next();
160             Row row = (Row) unfiltered;
161             if (isInsert(row)) {
162                 return true;
163             }
164         }
165         return false;
166     }
167
168     private boolean isInsert(Row row) {
169         return row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE;
170     }
171        
172         private void notifyMusic(String request) {
173                 logger.info("notifyMusic...");
174                 Client client = Client.create();
175                 WebResource webResource = client.resource("http://localhost:8080/MUSIC/rest/v2/admin/callbackOps");
176                         
177                 JSONObject data = new JSONObject();
178                 data.setData(request);
179                 ClientResponse response = null;
180                 try { 
181                         response = webResource.accept("application/json").type("application/json")
182                 .post(ClientResponse.class, data);
183                 } catch (Exception e) {
184                         logger.info("Exception while notifying MUSIC. Retrying..");
185                         try { 
186                                 response = webResource.accept("application/json").type("application/json")
187                         .post(ClientResponse.class, data);
188                         } catch (Exception e1) {
189                                 logger.info("Exception while notifying MUSIC. Stopping retry attempts..");
190                         }
191                 }
192                 
193                 if(response.getStatus() != 200){
194                         logger.info("Exception while notifying MUSIC...");
195                         try { 
196                                 response = webResource.accept("application/json").type("application/json")
197                         .post(ClientResponse.class, data);
198                         } catch (Exception e) {
199                                 logger.info("Exception while notifying MUSIC. Retrying..");
200                                 try { 
201                                         response = webResource.accept("application/json").type("application/json")
202                                 .post(ClientResponse.class, data);
203                                 } catch (Exception e1) {
204                                         logger.info("Exception while notifying MUSIC. Stopping retry attempts..");
205                                 }
206                         }
207         }
208         }
209
210 }