Support for post without auth
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import com.att.nsa.mr.client.HostSelector;
55 import com.att.nsa.mr.client.MRBatchingPublisher;
56 import com.att.nsa.mr.client.response.MRPublisherResponse;
57 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
58
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60         private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62         public static class Builder {
63                 public Builder() {
64                 }
65
66                 public Builder againstUrls(Collection<String> baseUrls) {
67                         fUrls = baseUrls;
68                         return this;
69                 }
70
71                 public Builder onTopic(String topic) {
72                         fTopic = topic;
73                         return this;
74                 }
75
76                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
77                         fMaxBatchSize = maxBatchSize;
78                         fMaxBatchAgeMs = maxBatchAgeMs;
79                         return this;
80                 }
81
82                 public Builder compress(boolean compress) {
83                         fCompress = compress;
84                         return this;
85                 }
86
87                 public Builder httpThreadTime(int threadOccuranceTime) {
88                         this.threadOccuranceTime = threadOccuranceTime;
89                         return this;
90                 }
91
92                 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
93                         fAllowSelfSignedCerts = allowSelfSignedCerts;
94                         return this;
95                 }
96
97                 public Builder withResponse(boolean withResponse) {
98                         fWithResponse = withResponse;
99                         return this;
100                 }
101
102                 public MRSimplerBatchPublisher build() {
103                         if (!fWithResponse) {
104                                 try {
105                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
106                                                         fAllowSelfSignedCerts, threadOccuranceTime);
107                                 } catch (MalformedURLException e) {
108                                         throw new IllegalArgumentException(e);
109                                 }
110                         } else {
111                                 try {
112                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
113                                                         fAllowSelfSignedCerts, fMaxBatchSize);
114                                 } catch (MalformedURLException e) {
115                                         throw new IllegalArgumentException(e);
116                                 }
117                         }
118
119                 }
120
121                 private Collection<String> fUrls;
122                 private String fTopic;
123                 private int fMaxBatchSize = 100;
124                 private long fMaxBatchAgeMs = 1000;
125                 private boolean fCompress = false;
126                 private int threadOccuranceTime = 50;
127                 private boolean fAllowSelfSignedCerts = false;
128                 private boolean fWithResponse = false;
129
130         };
131
132         @Override
133         public int send(String partition, String msg) {
134                 return send(new message(partition, msg));
135         }
136
137         @Override
138         public int send(String msg) {
139                 return send(new message(null, msg));
140         }
141
142         @Override
143         public int send(message msg) {
144                 final LinkedList<message> list = new LinkedList<message>();
145                 list.add(msg);
146                 return send(list);
147         }
148
149         @Override
150         public synchronized int send(Collection<message> msgs) {
151                 if (fClosed) {
152                         throw new IllegalStateException("The publisher was closed.");
153                 }
154
155                 for (message userMsg : msgs) {
156                         fPending.add(new TimestampedMessage(userMsg));
157                 }
158                 return getPendingMessageCount();
159         }
160
161         @Override
162         public synchronized int getPendingMessageCount() {
163                 return fPending.size();
164         }
165
166         @Override
167         public void close() {
168                 try {
169                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
170                         if (remains.size() > 0) {
171                                 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
172                                                 + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
173                         }
174                 } catch (InterruptedException e) {
175                         getLog().warn("Possible message loss. " + e.getMessage(), e);
176                 } catch (IOException e) {
177                         getLog().warn("Possible message loss. " + e.getMessage(), e);
178                 }
179         }
180
181         @Override
182         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
183                 synchronized (this) {
184                         fClosed = true;
185
186                         // stop the background sender
187                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
188                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
189                         fExec.shutdown();
190                 }
191
192                 final long now = Clock.now();
193                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
194                 final long timeoutAtMs = now + waitInMs;
195
196                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
197                         send(true);
198                         Thread.sleep(250);
199                 }
200
201                 synchronized (this) {
202                         final LinkedList<message> result = new LinkedList<message>();
203                         fPending.drainTo(result);
204                         return result;
205                 }
206         }
207
208         /**
209          * Possibly send a batch to the MR server. This is called by the background
210          * thread and the close() method
211          * 
212          * @param force
213          */
214         private synchronized void send(boolean force) {
215                 if (force || shouldSendNow()) {
216                         if (!sendBatch()) {
217                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
218
219                                 // note the time for back-off
220                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
221                         }
222                 }
223         }
224
225         private synchronized boolean shouldSendNow() {
226                 boolean shouldSend = false;
227                 if (fPending.size() > 0) {
228                         final long nowMs = Clock.now();
229
230                         shouldSend = (fPending.size() >= fMaxBatchSize);
231                         if (!shouldSend) {
232                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
233                                 shouldSend = sendAtMs <= nowMs;
234                         }
235
236                         // however, wait after an error
237                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
238                 }
239                 return shouldSend;
240         }
241
242         /**
243          * Method to parse published JSON Objects and Arrays
244          * 
245          * @return JSONArray
246          */
247         private JSONArray parseJSON() {
248                 JSONArray jsonArray = new JSONArray();
249                 for (TimestampedMessage m : fPending) {
250                         JSONTokener jsonTokener = new JSONTokener(m.fMsg);
251                         JSONObject jsonObject = null;
252                         JSONArray tempjsonArray = null;
253                         final char firstChar = jsonTokener.next();
254                         jsonTokener.back();
255                         if ('[' == firstChar) {
256                                 tempjsonArray = new JSONArray(jsonTokener);
257                                 if (null != tempjsonArray) {
258                                         for (int i = 0; i < tempjsonArray.length(); i++) {
259                                                 jsonArray.put(tempjsonArray.getJSONObject(i));
260                                         }
261                                 }
262                         } else {
263                                 jsonObject = new JSONObject(jsonTokener);
264                                 jsonArray.put(jsonObject);
265                         }
266
267                 }
268                 return jsonArray;
269         }
270
271         private synchronized boolean sendBatch() {
272                 // it's possible for this call to be made with an empty list. in this
273                 // case, just return.
274                 if (fPending.size() < 1) {
275                         return true;
276                 }
277
278                 final long nowMs = Clock.now();
279
280                 host = this.fHostSelector.selectBaseHost();
281
282                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
283                                 props.getProperty("partition"));
284
285                 try {
286                         /*
287                          * final String contentType = fCompress ?
288                          * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
289                          */
290
291                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
292                         OutputStream os = baseStream;
293                         final String contentType = props.getProperty("contenttype");
294                         if (contentType.equalsIgnoreCase("application/json")) {
295                                 JSONArray jsonArray = parseJSON();
296                                 os.write(jsonArray.toString().getBytes());
297                                 os.close();
298
299                         } else if (contentType.equalsIgnoreCase("text/plain")) {
300                                 for (TimestampedMessage m : fPending) {
301                                         os.write(m.fMsg.getBytes());
302                                         os.write('\n');
303                                 }
304                                 os.close();
305                         } else if (contentType.equalsIgnoreCase("application/cambria")
306                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
307                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
308                                         os = new GZIPOutputStream(baseStream);
309                                 }
310                                 for (TimestampedMessage m : fPending) {
311
312                                         os.write(("" + m.fPartition.length()).getBytes());
313                                         os.write('.');
314                                         os.write(("" + m.fMsg.length()).getBytes());
315                                         os.write('.');
316                                         os.write(m.fPartition.getBytes());
317                                         os.write(m.fMsg.getBytes());
318                                         os.write('\n');
319                                 }
320                                 os.close();
321                         } else {
322                                 for (TimestampedMessage m : fPending) {
323                                         os.write(m.fMsg.getBytes());
324
325                                 }
326                                 os.close();
327                         }
328
329                         final long startMs = Clock.now();
330                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
331
332                                 DME2Configue();
333
334                                 Thread.sleep(5);
335                                 getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
336                                                 + (nowMs - fPending.peek().timestamp) + " ms");
337                                 sender.setPayload(os.toString());
338                                 String dmeResponse = sender.sendAndWait(5000L);
339
340                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
341                                 getLog().info(logLine);
342                                 fPending.clear();
343                                 return true;
344                         }
345
346                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
347                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
348                                                 + (nowMs - fPending.peek().timestamp) + " ms");
349                                 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
350                                                 username, password, protocolFlag);
351                                 // Here we are checking for error response. If HTTP status
352                                 // code is not within the http success response code
353                                 // then we consider this as error and return false
354                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
355                                         return false;
356                                 }
357                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
358                                 getLog().info(logLine);
359                                 fPending.clear();
360                                 return true;
361                         }
362
363                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
364                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
365                                                 + (nowMs - fPending.peek().timestamp) + " ms");
366                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
367                                                 protocolFlag);
368
369                                 // Here we are checking for error response. If HTTP status
370                                 // code is not within the http success response code
371                                 // then we consider this as error and return false
372                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
373                                         return false;
374                                 }
375                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
376                                 getLog().info(logLine);
377                                 fPending.clear();
378                                 return true;
379                         }
380                         
381                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
382                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
383                                                 + (nowMs - fPending.peek().timestamp) + " ms");
384                                 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
385
386                                 // Here we are checking for error response. If HTTP status
387                                 // code is not within the http success response code
388                                 // then we consider this as error and return false
389                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
390                                         return false;
391                                 }
392                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
393                                 getLog().info(logLine);
394                                 fPending.clear();
395                                 return true;
396                         }
397                 } catch (IllegalArgumentException x) {
398                         getLog().warn(x.getMessage(), x);
399                 } catch (IOException x) {
400                         getLog().warn(x.getMessage(), x);
401                 } catch (HttpException x) {
402                         getLog().warn(x.getMessage(), x);
403                 } catch (Exception x) {
404                         getLog().warn(x.getMessage(), x);
405                 }
406                 return false;
407         }
408
409         public synchronized MRPublisherResponse sendBatchWithResponse() {
410                 // it's possible for this call to be made with an empty list. in this
411                 // case, just return.
412                 if (fPending.size() < 1) {
413                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
414                         pubResponse.setResponseMessage("No Messages to send");
415                         return pubResponse;
416                 }
417
418                 final long nowMs = Clock.now();
419
420                 host = this.fHostSelector.selectBaseHost();
421
422                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
423                                 props.getProperty("partition"));
424                 OutputStream os = null;
425                 try {
426
427                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
428                         os = baseStream;
429                         final String contentType = props.getProperty("contenttype");
430                         if (contentType.equalsIgnoreCase("application/json")) {
431                                 JSONArray jsonArray = parseJSON();
432                                 os.write(jsonArray.toString().getBytes());
433                         } else if (contentType.equalsIgnoreCase("text/plain")) {
434                                 for (TimestampedMessage m : fPending) {
435                                         os.write(m.fMsg.getBytes());
436                                         os.write('\n');
437                                 }
438                         } else if (contentType.equalsIgnoreCase("application/cambria")
439                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
440                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
441                                         os = new GZIPOutputStream(baseStream);
442                                 }
443                                 for (TimestampedMessage m : fPending) {
444
445                                         os.write(("" + m.fPartition.length()).getBytes());
446                                         os.write('.');
447                                         os.write(("" + m.fMsg.length()).getBytes());
448                                         os.write('.');
449                                         os.write(m.fPartition.getBytes());
450                                         os.write(m.fMsg.getBytes());
451                                         os.write('\n');
452                                 }
453                                 os.close();
454                         } else {
455                                 for (TimestampedMessage m : fPending) {
456                                         os.write(m.fMsg.getBytes());
457
458                                 }
459                         }
460
461                         final long startMs = Clock.now();
462                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
463
464                                 try {
465                                         DME2Configue();
466
467                                         Thread.sleep(5);
468                                         getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
469                                                         + (nowMs - fPending.peek().timestamp) + " ms");
470                                         sender.setPayload(os.toString());
471
472                                         String dmeResponse = sender.sendAndWait(5000L);
473
474                                         pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
475
476                                         if (Integer.valueOf(pubResponse.getResponseCode()) < 200
477                                                         || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
478
479                                                 return pubResponse;
480                                         }
481                                         final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
482                                         getLog().info(logLine);
483                                         fPending.clear();
484
485                                 } catch (DME2Exception x) {
486                                         getLog().warn(x.getMessage(), x);
487                                         pubResponse.setResponseCode(x.getErrorCode());
488                                         pubResponse.setResponseMessage(x.getErrorMessage());
489                                 } catch (URISyntaxException x) {
490
491                                         getLog().warn(x.getMessage(), x);
492                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
493                                         pubResponse.setResponseMessage(x.getMessage());
494                                 } catch (Exception x) {
495
496                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
497                                         pubResponse.setResponseMessage(x.getMessage());
498                                         logger.error("exception: ", x);
499
500                                 }
501
502                                 return pubResponse;
503                         }
504
505                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
506                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
507                                                 + (nowMs - fPending.peek().timestamp) + " ms");
508                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
509                                                 authDate, username, password, protocolFlag);
510                                 // Here we are checking for error response. If HTTP status
511                                 // code is not within the http success response code
512                                 // then we consider this as error and return false
513
514                                 pubResponse = createMRPublisherResponse(result, pubResponse);
515
516                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
517                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
518
519                                         return pubResponse;
520                                 }
521
522                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
523                                 getLog().info(logLine);
524                                 fPending.clear();
525                                 return pubResponse;
526                         }
527
528                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
529                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
530                                                 + (nowMs - fPending.peek().timestamp) + " ms");
531                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
532                                                 password, protocolFlag);
533
534                                 // Here we are checking for error response. If HTTP status
535                                 // code is not within the http success response code
536                                 // then we consider this as error and return false
537                                 pubResponse = createMRPublisherResponse(result, pubResponse);
538
539                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
540                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
541
542                                         return pubResponse;
543                                 }
544
545                                 final String logLine = String.valueOf((Clock.now() - startMs));
546                                 getLog().info(logLine);
547                                 fPending.clear();
548                                 return pubResponse;
549                         }
550                         
551                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
552                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
553                                                 + (nowMs - fPending.peek().timestamp) + " ms");
554                                 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
555
556                                 // Here we are checking for error response. If HTTP status
557                                 // code is not within the http success response code
558                                 // then we consider this as error and return false
559                                 pubResponse = createMRPublisherResponse(result, pubResponse);
560
561                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
562                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
563
564                                         return pubResponse;
565                                 }
566
567                                 final String logLine = String.valueOf((Clock.now() - startMs));
568                                 getLog().info(logLine);
569                                 fPending.clear();
570                                 return pubResponse;
571                         }
572                 } catch (IllegalArgumentException x) {
573                         getLog().warn(x.getMessage(), x);
574                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
575                         pubResponse.setResponseMessage(x.getMessage());
576
577                 } catch (IOException x) {
578                         getLog().warn(x.getMessage(), x);
579                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
580                         pubResponse.setResponseMessage(x.getMessage());
581
582                 } catch (HttpException x) {
583                         getLog().warn(x.getMessage(), x);
584                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
585                         pubResponse.setResponseMessage(x.getMessage());
586
587                 } catch (Exception x) {
588                         getLog().warn(x.getMessage(), x);
589
590                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
591                         pubResponse.setResponseMessage(x.getMessage());
592
593                 }
594
595                 finally {
596                         if (fPending.size() > 0) {
597                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
598                                 pubResponse.setPendingMsgs(fPending.size());
599                         }
600                         if (os != null) {
601                                 try {
602                                         os.close();
603                                 } catch (Exception x) {
604                                         getLog().warn(x.getMessage(), x);
605                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
606                                         pubResponse.setResponseMessage("Error in closing Output Stream");
607                                 }
608                         }
609                 }
610
611                 return pubResponse;
612         }
613
614         public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
615
616                 if (reply.isEmpty()) {
617
618                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
619                         mrPubResponse.setResponseMessage("Please verify the Producer properties");
620                 } else if (reply.startsWith("{")) {
621                         JSONObject jObject = new JSONObject(reply);
622                         if (jObject.has("message") && jObject.has("status")) {
623                                 String message = jObject.getString("message");
624                                 if (null != message) {
625                                         mrPubResponse.setResponseMessage(message);
626                                 }
627                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
628                         } else {
629                                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
630                                 mrPubResponse.setResponseMessage(reply);
631                         }
632                 } else if (reply.startsWith("<")) {
633                         String responseCode = getHTTPErrorResponseCode(reply);
634                         if (responseCode.contains("403")) {
635                                 responseCode = "403";
636                         }
637                         mrPubResponse.setResponseCode(responseCode);
638                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
639                 }
640
641                 return mrPubResponse;
642         }
643
644         private final String fTopic;
645         private final int fMaxBatchSize;
646         private final long fMaxBatchAgeMs;
647         private final boolean fCompress;
648         private int threadOccuranceTime;
649         private boolean fClosed;
650         private String username;
651         private String password;
652         private String host;
653
654         // host selector
655         private HostSelector fHostSelector = null;
656
657         private final LinkedBlockingQueue<TimestampedMessage> fPending;
658         private long fDontSendUntilMs;
659         private final ScheduledThreadPoolExecutor fExec;
660
661         private String latitude;
662         private String longitude;
663         private String version;
664         private String serviceName;
665         private String env;
666         private String partner;
667         private String routeOffer;
668         private String subContextPath;
669         private String protocol;
670         private String methodType;
671         private String url;
672         private String dmeuser;
673         private String dmepassword;
674         private String contentType;
675         private static final long sfWaitAfterError = 10000;
676         private HashMap<String, String> DMETimeOuts;
677         private DME2Client sender;
678         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
679         private String authKey;
680         private String authDate;
681         private String handlers;
682         private Properties props;
683         public static String routerFilePath;
684         protected static final Map<String, String> headers = new HashMap<String, String>();
685         public static MultivaluedMap<String, Object> headersMap;
686
687         private MRPublisherResponse pubResponse;
688
689         public MRPublisherResponse getPubResponse() {
690                 return pubResponse;
691         }
692
693         public void setPubResponse(MRPublisherResponse pubResponse) {
694                 this.pubResponse = pubResponse;
695         }
696
697         public static String getRouterFilePath() {
698                 return routerFilePath;
699         }
700
701         public static void setRouterFilePath(String routerFilePath) {
702                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
703         }
704
705         public Properties getProps() {
706                 return props;
707         }
708
709         public void setProps(Properties props) {
710                 this.props = props;
711         }
712
713         public String getProtocolFlag() {
714                 return protocolFlag;
715         }
716
717         public void setProtocolFlag(String protocolFlag) {
718                 this.protocolFlag = protocolFlag;
719         }
720
721         private void DME2Configue() throws Exception {
722                 try {
723
724                         /*
725                          * FileReader reader = new FileReader(new File (producerFilePath));
726                          * Properties props = new Properties(); props.load(reader);
727                          */
728                         latitude = props.getProperty("Latitude");
729                         longitude = props.getProperty("Longitude");
730                         version = props.getProperty("Version");
731                         serviceName = props.getProperty("ServiceName");
732                         env = props.getProperty("Environment");
733                         partner = props.getProperty("Partner");
734                         routeOffer = props.getProperty("routeOffer");
735                         subContextPath = props.getProperty("SubContextPath") + fTopic;
736                         /*
737                          * if(props.getProperty("partition")!=null &&
738                          * !props.getProperty("partition").equalsIgnoreCase("")){
739                          * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
740                          * "partition"); }
741                          */
742                         protocol = props.getProperty("Protocol");
743                         methodType = props.getProperty("MethodType");
744                         dmeuser = props.getProperty("username");
745                         dmepassword = props.getProperty("password");
746                         contentType = props.getProperty("contenttype");
747                         handlers = props.getProperty("sessionstickinessrequired");
748                         routerFilePath = props.getProperty("DME2preferredRouterFilePath");
749
750                         /**
751                          * Changes to DME2Client url to use Partner for auto failover
752                          * between data centers When Partner value is not provided use the
753                          * routeOffer value for auto failover within a cluster
754                          */
755
756                         String partitionKey = props.getProperty("partition");
757
758                         if (partner != null && !partner.isEmpty()) {
759                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
760                                                 + partner;
761                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
762                                         url = url + "&partitionKey=" + partitionKey;
763                                 }
764                         } else if (routeOffer != null && !routeOffer.isEmpty()) {
765                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
766                                                 + routeOffer;
767                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
768                                         url = url + "&partitionKey=" + partitionKey;
769                                 }
770                         }
771
772                         DMETimeOuts = new HashMap<String, String>();
773                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
774                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
775                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
776                         DMETimeOuts.put("Content-Type", contentType);
777                         System.setProperty("AFT_LATITUDE", latitude);
778                         System.setProperty("AFT_LONGITUDE", longitude);
779                         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
780                         // System.setProperty("DME2.DEBUG", "true");
781
782                         // SSL changes
783                         // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
784                         // "SSLv3,TLSv1,TLSv1.1");
785                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
786                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
787                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
788
789                         // SSL changes
790
791                         sender = new DME2Client(new URI(url), 5000L);
792
793                         sender.setAllowAllHttpReturnCodes(true);
794                         sender.setMethod(methodType);
795                         sender.setSubContext(subContextPath);
796                         sender.setCredentials(dmeuser, dmepassword);
797                         sender.setHeaders(DMETimeOuts);
798                         if (handlers.equalsIgnoreCase("yes")) {
799                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
800                                                 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
801                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
802                                                 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
803                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
804                         } else {
805                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
806                         }
807                 } catch (DME2Exception x) {
808                         getLog().warn(x.getMessage(), x);
809                         throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
810                 } catch (URISyntaxException x) {
811
812                         getLog().warn(x.getMessage(), x);
813                         throw new URISyntaxException(url, x.getMessage());
814                 } catch (Exception x) {
815
816                         getLog().warn(x.getMessage(), x);
817                         throw new IllegalArgumentException(x.getMessage());
818                 }
819         }
820
821         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
822                         boolean compress) throws MalformedURLException {
823                 super(hosts);
824
825                 if (topic == null || topic.length() < 1) {
826                         throw new IllegalArgumentException("A topic must be provided.");
827                 }
828
829                 fHostSelector = new HostSelector(hosts, null);
830                 fClosed = false;
831                 fTopic = topic;
832                 fMaxBatchSize = maxBatchSize;
833                 fMaxBatchAgeMs = maxBatchAgeMs;
834                 fCompress = compress;
835
836                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
837                 fDontSendUntilMs = 0;
838                 fExec = new ScheduledThreadPoolExecutor(1);
839                 pubResponse = new MRPublisherResponse();
840
841         }
842
843         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
844                         boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
845                 super(hosts);
846
847                 if (topic == null || topic.length() < 1) {
848                         throw new IllegalArgumentException("A topic must be provided.");
849                 }
850
851                 fHostSelector = new HostSelector(hosts, null);
852                 fClosed = false;
853                 fTopic = topic;
854                 fMaxBatchSize = maxBatchSize;
855                 fMaxBatchAgeMs = maxBatchAgeMs;
856                 fCompress = compress;
857                 threadOccuranceTime = httpThreadOccurnace;
858                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
859                 fDontSendUntilMs = 0;
860                 fExec = new ScheduledThreadPoolExecutor(1);
861                 fExec.scheduleAtFixedRate(new Runnable() {
862                         @Override
863                         public void run() {
864                                 send(false);
865                         }
866                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
867         }
868
869         private static class TimestampedMessage extends message {
870                 public TimestampedMessage(message m) {
871                         super(m);
872                         timestamp = Clock.now();
873                 }
874
875                 public final long timestamp;
876         }
877
878         public String getUsername() {
879                 return username;
880         }
881
882         public void setUsername(String username) {
883                 this.username = username;
884         }
885
886         public String getPassword() {
887                 return password;
888         }
889
890         public void setPassword(String password) {
891                 this.password = password;
892         }
893
894         public String getHost() {
895                 return host;
896         }
897
898         public void setHost(String host) {
899                 this.host = host;
900         }
901
902         public String getContentType() {
903                 return contentType;
904         }
905
906         public void setContentType(String contentType) {
907                 this.contentType = contentType;
908         }
909
910         public String getAuthKey() {
911                 return authKey;
912         }
913
914         public void setAuthKey(String authKey) {
915                 this.authKey = authKey;
916         }
917
918         public String getAuthDate() {
919                 return authDate;
920         }
921
922         public void setAuthDate(String authDate) {
923                 this.authDate = authDate;
924         }
925
926 }