2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.nio.ByteBuffer;
30 import java.nio.charset.Charset;
31 import java.nio.charset.StandardCharsets;
32 import java.util.ArrayList;
35 import javax.ws.rs.core.HttpHeaders;
36 import javax.ws.rs.core.MediaType;
38 import org.apache.cassandra.config.ColumnDefinition;
39 import org.apache.cassandra.db.Clustering;
40 import org.apache.cassandra.db.Mutation;
41 import org.apache.cassandra.db.partitions.Partition;
42 import org.apache.cassandra.db.rows.Cell;
43 import org.apache.cassandra.db.rows.Row;
44 import org.apache.cassandra.db.rows.Unfiltered;
45 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
46 import org.apache.cassandra.triggers.ITrigger;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 import com.sun.jersey.api.client.Client;
51 import com.sun.jersey.api.client.ClientResponse;
52 import com.sun.jersey.api.client.WebResource;
55 public class MusicTrigger implements ITrigger {
57 private static final Logger logger = LoggerFactory.getLogger(MusicTrigger.class);
59 public Collection<Mutation> augment(Partition partition)
61 new Thread(new Runnable() {
63 makeAsyncCall(partition);
66 return Collections.emptyList();
69 private void makeAsyncCall(Partition partition) {
70 boolean isDelete = false;
71 if(partition.partitionLevelDeletion().isLive()) {
74 // Partition Level Deletion
77 logger.info("MusicTrigger isDelete: " + isDelete);
78 String ksName = partition.metadata().ksName;
79 String tableName = partition.metadata().cfName;
80 String pkValue = partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey());
81 logger.info("MusicTrigger Table: " + tableName);
82 boolean isInsert = checkQueryType(partition);
83 org.json.simple.JSONObject obj = new org.json.simple.JSONObject();
85 String operation = null;
92 Map<String, Object> changeMap = new HashMap<>();
94 obj.put("operation", operation);
95 obj.put("keyspace", ksName);
96 obj.put("table_name", tableName);
97 obj.put("full_table", ksName+"."+tableName);
98 obj.put("primary_key", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
99 List<String> updateList = new ArrayList<>();
100 //obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
101 if("update".equals(operation)) {
103 UnfilteredRowIterator it = partition.unfilteredIterator();
104 while (it.hasNext()) {
105 Unfiltered un = it.next();
106 Clustering clt = (Clustering) un.clustering();
107 Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
108 Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
110 while(columns.hasNext()){
111 ColumnDefinition columnDef = columns.next();
112 Cell cell = cells.next();
115 if(cell.column().type.toString().equals("org.apache.cassandra.db.marshal.UTF8Type")) {
116 logger.info(">> type is String");
117 data = new String(cell.value().array()); // If cell type is text
118 } else if(cell.column().type.toString().equals("org.apache.cassandra.db.marshal.Int32Type")) {
119 //ByteBuffer wrapped = ByteBuffer.wrap(cell.value()); // big-endian by default
120 int num = fromByteArray(cell.value().array());
121 logger.info(">> type is Integer1 :: "+num);
122 data = String.valueOf(num);
125 logger.info("Inside triggers loop: "+columnDef.name+" : "+data);
126 //changeMap.put(ksName+"."+tableName+"."+columnDef.name,data);
127 updateList.add(ksName+"."+tableName+":"+columnDef.name+":"+data);
128 changeMap.put("field_value",ksName+"."+tableName+":"+columnDef.name+":"+data);
132 } catch (Exception e) {
133 logger.info("Exception while constructing.. "+e.getMessage());
135 obj.put("updateList", updateList);
137 changeMap.put("field_value", ksName+"."+tableName);
140 obj.put("changeValue", changeMap);
141 logger.info("Sending response: "+obj.toString());
143 notifyMusic(obj.toString());
144 logger.info("MUSIC was notified.. "+obj.toString());
145 } catch(Exception e) {
146 logger.error("Notification failed..."+e.getMessage());
147 logger.info("Notification failed..."+e.getMessage());
152 private int fromByteArray(byte[] bytes) {
153 return bytes[0] << 24 | (bytes[1] & 0xFF) << 16 | (bytes[2] & 0xFF) << 8 | (bytes[3] & 0xFF);
156 private boolean checkQueryType(Partition partition) {
157 UnfilteredRowIterator it = partition.unfilteredIterator();
158 while (it.hasNext()) {
159 Unfiltered unfiltered = it.next();
160 Row row = (Row) unfiltered;
168 private boolean isInsert(Row row) {
169 return row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE;
172 private void notifyMusic(String request) {
173 logger.info("notifyMusic...");
174 Client client = Client.create();
175 WebResource webResource = client.resource("http://localhost:8080/MUSIC/rest/v2/admin/callbackOps");
177 JSONObject data = new JSONObject();
178 data.setData(request);
179 ClientResponse response = null;
181 response = webResource.accept("application/json").type("application/json")
182 .post(ClientResponse.class, data);
183 } catch (Exception e) {
184 logger.info("Exception while notifying MUSIC. Retrying..");
186 response = webResource.accept("application/json").type("application/json")
187 .post(ClientResponse.class, data);
188 } catch (Exception e1) {
189 logger.info("Exception while notifying MUSIC. Stopping retry attempts..");
193 if(response.getStatus() != 200){
194 logger.info("Exception while notifying MUSIC...");
196 response = webResource.accept("application/json").type("application/json")
197 .post(ClientResponse.class, data);
198 } catch (Exception e) {
199 logger.info("Exception while notifying MUSIC. Retrying..");
201 response = webResource.accept("application/json").type("application/json")
202 .post(ClientResponse.class, data);
203 } catch (Exception e1) {
204 logger.info("Exception while notifying MUSIC. Stopping retry attempts..");