2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Iterator;
30 import javax.ws.rs.core.HttpHeaders;
31 import javax.ws.rs.core.MediaType;
33 import org.apache.cassandra.config.ColumnDefinition;
34 import org.apache.cassandra.db.Clustering;
35 import org.apache.cassandra.db.Mutation;
36 import org.apache.cassandra.db.partitions.Partition;
37 import org.apache.cassandra.db.rows.Cell;
38 import org.apache.cassandra.db.rows.Row;
39 import org.apache.cassandra.db.rows.Unfiltered;
40 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
41 import org.apache.cassandra.triggers.ITrigger;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import com.sun.jersey.api.client.Client;
46 import com.sun.jersey.api.client.ClientResponse;
47 import com.sun.jersey.api.client.WebResource;
49 public class MusicTrigger implements ITrigger {
51 private static final Logger logger = LoggerFactory.getLogger(MusicTrigger.class);
54 public Collection<Mutation> augment(Partition partition)
56 boolean isDelete = false;
57 logger.info("Step 1: "+partition.partitionLevelDeletion().isLive());
58 if(partition.partitionLevelDeletion().isLive()) {
61 // Partition Level Deletion
64 logger.info("MusicTrigger isDelete: " + isDelete);
65 String ksName = partition.metadata().ksName;
66 String tableName = partition.metadata().cfName;
67 logger.info("MusicTrigger Table: " + tableName);
68 boolean isInsert = checkQueryType(partition);
69 org.json.simple.JSONObject obj = new org.json.simple.JSONObject();
72 String operation = null;
79 Map<String, String> changeMap = new HashMap<>();
81 obj.put("operation", operation);
82 obj.put("keyspace", ksName);
83 obj.put("table_name", tableName);
84 obj.put("full_table", ksName+"."+tableName);
85 obj.put("primary_key", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
87 //obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
88 if("update".equals(operation)) {
90 UnfilteredRowIterator it = partition.unfilteredIterator();
91 while (it.hasNext()) {
92 Unfiltered un = it.next();
93 Clustering clt = (Clustering) un.clustering();
94 Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
95 Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
97 while(columns.hasNext()){
98 ColumnDefinition columnDef = columns.next();
99 Cell cell = cells.next();
100 String data = new String(cell.value().array()); // If cell type is text
101 logger.info("Inside triggers loop: "+columnDef.name+" : "+data);
102 changeMap.put(ksName+"."+tableName+"."+columnDef.name,data);
103 changeMap.put("field_value",ksName+"."+tableName+":"+columnDef.name+":"+data);
106 } catch (Exception e) {
110 changeMap.put("field_value", ksName+"."+tableName);
113 obj.put("changeValue", changeMap);
114 logger.info("Sending response: "+obj.toString());
116 notifyMusic(obj.toString());
117 } catch(Exception e) {
119 logger.error("Notification failed..."+e.getMessage());
121 return Collections.emptyList();
124 private boolean checkQueryType(Partition partition) {
125 UnfilteredRowIterator it = partition.unfilteredIterator();
126 while (it.hasNext()) {
127 Unfiltered unfiltered = it.next();
128 Row row = (Row) unfiltered;
136 private boolean isInsert(Row row) {
137 return row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE;
140 private void notifyMusic(String request) {
141 System.out.println("notifyMusic...");
142 Client client = Client.create();
143 WebResource webResource = client.resource("http://localhost:8080/MUSIC/rest/v2/admin/callbackOps");
145 JSONObject data = new JSONObject();
146 data.setData(request);
148 ClientResponse response = webResource.accept("application/json").type("application/json")
149 .post(ClientResponse.class, data);
151 if(response.getStatus() != 200){
152 System.out.println("Exception...");
154 response.getHeaders().put(HttpHeaders.CONTENT_TYPE, Arrays.asList(MediaType.APPLICATION_JSON));
155 response.bufferEntity();
156 String x = response.getEntity(String.class);
157 System.out.println("Response: "+x);
161 /*public Collection<Mutation> augment(Partition partition) {
163 String tableName = partition.metadata().cfName;
164 System.out.println("Table: " + tableName);
166 JSONObject obj = new JSONObject();
167 obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
171 UnfilteredRowIterator it = partition.unfilteredIterator();
172 while (it.hasNext()) {
173 Unfiltered un = it.next();
174 Clustering clt = (Clustering) un.clustering();
175 Iterator<Cell> cls = partition.getRow(clt).cells().iterator();
176 Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
178 while(cls.hasNext()){
179 Cell cell = cls.next();
180 String data = new String(cell.value().array()); // If cell type is text
181 System.out.println(cell + " : " +data);
184 while(columns.hasNext()){
185 ColumnDefinition columnDef = columns.next();
186 Cell cell = cls.next();
187 String data = new String(cell.value().array()); // If cell type is text
188 obj.put(columnDef.toString(), data);
191 } catch (Exception e) {
194 System.out.println(obj.toString());
196 return Collections.emptyList();