- public Response callbackOps(final JSONObject inputJsonObj) {
- // {"keyspace":"conductor","full_table":"conductor.plans","changeValue":{"conductor.plans.status":"Who??","position":"3"},"operation":"update","table_name":"plans","primary_key":"3"}
- Map<String, Object> resultMap = new HashMap<>();
- new Thread(new Runnable() {
- public void run() {
- makeAsyncCall(inputJsonObj);
- }
- }).start();
-
- return Response.status(Status.OK).entity(resultMap).build();
- }
-
- private Response makeAsyncCall(JSONObject inputJsonObj) {
-
- Map<String, Object> resultMap = new HashMap<>();
- try {
- logger.info("Got notification: " + inputJsonObj.getData());
- String dataStr = inputJsonObj.getData();
- ObjectMapper mapper = new ObjectMapper();
- JSONCallbackResponse jsonResponse = mapper.readValue(dataStr, JSONCallbackResponse.class);
- String operation = jsonResponse.getOperation();
- Map<String, String> changeValueMap = jsonResponse.getChangeValue();
- String primaryKey = jsonResponse.getPrimary_key();
- //String full_table = jsonResponse.getFull_table(); //conductor.plans
- String field_value = changeValueMap.get("field_value");
- if(field_value == null)
- field_value = jsonResponse.getFull_table();
- //Get from Cache
- JsonCallback baseRequestObj = CachingUtil.getCallBackCache(field_value);
- //baseRequestObj = null;
-
- if(baseRequestObj == null) {
- logger.info("Is cache empty? reconstructing Object from cache..");
- baseRequestObj = constructJsonCallbackFromCache(field_value);
- }
- if(baseRequestObj == null) {
- resultMap.put("Exception",
- "Oops. Something went wrong. Please make sure Callback properties are onboarded.");
- logger.error(EELFLoggerDelegate.errorLogger, "", AppMessages.INCORRECTDATA,
- ErrorSeverity.CRITICAL, ErrorTypes.DATAERROR);
- return Response.status(Status.BAD_REQUEST).entity(resultMap).build();
- }
-
- String key = "admin" + "." + "notification_master" + "." + baseRequestObj.getUuid();
- String lockId = MusicCore.createLockReference(key);
- long lockCreationTime = System.currentTimeMillis();
- ReturnType lockAcqResult = MusicCore.acquireLock(key, lockId);
- if(! lockAcqResult.getResult().toString().equals("SUCCESS")) {
- logger.info("Some other node is notifying the caller..");
- }
-
- logger.info(operation+ ": Operation :: changeValue: "+changeValueMap);
- if(operation.equals("update")) {
- String notifyWhenChangeIn = baseRequestObj.getNotifyWhenChangeIn(); // conductor.plans.status
- logger.info("**********notifyWhenChangeIn: "+notifyWhenChangeIn);
- if(field_value.equals(notifyWhenChangeIn)) {
- logger.info("********** notifyting the endpoint: "+baseRequestObj.getApplicationNotificationEndpoint());
- notifyCallBackAppl(jsonResponse, baseRequestObj);
- }
-
- } else if(operation.equals("delete")) {
- String notifyWhenDeletesIn = baseRequestObj.getNotifyWhenDeletesIn(); // conductor.plans.status
- logger.info("**********notifyWhenDeletesIn: "+notifyWhenDeletesIn);
- if(field_value.equals(notifyWhenDeletesIn)) {
- logger.info("********** notifyting the endpoint: "+baseRequestObj.getApplicationNotificationEndpoint());
- notifyCallBackAppl(jsonResponse, baseRequestObj);
- }
- } else if(operation.equals("insert")) {
- String notifyWhenInsertsIn = baseRequestObj.getNotifyWhenInsertsIn(); // conductor.plans.status
- logger.info("**********notifyWhenInsertsIn: "+notifyWhenInsertsIn);
- if(field_value.equals(notifyWhenInsertsIn)) {
- logger.info("********** notifyting the endpoint: "+baseRequestObj.getApplicationNotificationEndpoint());
- notifyCallBackAppl(jsonResponse, baseRequestObj);
- }
- }
- MusicCore.releaseLock(lockId, true);
- } catch(Exception e) {
- e.printStackTrace();
- logger.info("Exception...");
- }
- logger.info(">>> callback is completed. Notification was sent from Music...");
- return Response.status(Status.OK).entity(resultMap).build();
- }
-
- private void notifyCallBackAppl(JSONCallbackResponse jsonResponse, JsonCallback baseRequestObj) {
- int notifytimeout = MusicUtil.getNotifyTimeout();
- int notifyinterval = MusicUtil.getNotifyInterval();
- String endpoint = baseRequestObj.getApplicationNotificationEndpoint();
- String username = baseRequestObj.getApplicationUsername();
- String password = baseRequestObj.getApplicationPassword();
- JsonNotification jsonNotification = constructJsonNotification(jsonResponse, baseRequestObj);
- jsonNotification.setOperation_type(jsonResponse.getOperation());
- logger.info("Response sent is: "+jsonNotification);
- WebResource webResource = client.resource(endpoint);
- String authData = username+":"+password;
- byte[] plainCredsBytes = authData.getBytes();
- byte[] base64CredsBytes = Base64.encode(plainCredsBytes);
- String base64Creds = new String(base64CredsBytes);
- Map<String, String> response_body = baseRequestObj.getResponseBody();
- ClientResponse response = null;
- try {
- response = webResource.header("Authorization", "Basic " + base64Creds).accept("application/json").type("application/json")
- .post(ClientResponse.class, jsonNotification);
- } catch (com.sun.jersey.api.client.ClientHandlerException chf) {
- boolean ok = false;
- logger.info("Is Service down?");
- long now= System.currentTimeMillis();
- long end = now+notifytimeout;
- while(! ok) {
- logger.info("retrying since error in notifying callback..");
- try {
- response = webResource.header("Authorization", "Basic " + base64Creds).accept("application/json").type("application/json")
- .post(ClientResponse.class, jsonNotification);
- if(response.getStatus() == 200) ok = true;
- }catch (Exception e) {
- System.err.println("Is response null: "+response==null);
- System.err.println("Retry until "+(end-System.currentTimeMillis()));
- if(response == null && System.currentTimeMillis() < end) ok = false;
- else ok = true;
- try{ Thread.sleep(notifyinterval); } catch(Exception e1) {}
- }
- }