Merge "Sonar critical issues"
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import com.att.nsa.mr.client.HostSelector;
55 import com.att.nsa.mr.client.MRBatchingPublisher;
56 import com.att.nsa.mr.client.response.MRPublisherResponse;
57 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
58
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60         private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62         public static class Builder {
63                 public Builder() {
64                 }
65
66                 public Builder againstUrls(Collection<String> baseUrls) {
67                         fUrls = baseUrls;
68                         return this;
69                 }
70
71                 public Builder onTopic(String topic) {
72                         fTopic = topic;
73                         return this;
74                 }
75
76                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
77                         fMaxBatchSize = maxBatchSize;
78                         fMaxBatchAgeMs = maxBatchAgeMs;
79                         return this;
80                 }
81
82                 public Builder compress(boolean compress) {
83                         fCompress = compress;
84                         return this;
85                 }
86
87                 public Builder httpThreadTime(int threadOccuranceTime) {
88                         this.threadOccuranceTime = threadOccuranceTime;
89                         return this;
90                 }
91
92                 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
93                         fAllowSelfSignedCerts = allowSelfSignedCerts;
94                         return this;
95                 }
96
97                 public Builder withResponse(boolean withResponse) {
98                         fWithResponse = withResponse;
99                         return this;
100                 }
101
102                 public MRSimplerBatchPublisher build() {
103                         if (!fWithResponse) {
104                                 try {
105                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
106                                                         fAllowSelfSignedCerts, threadOccuranceTime);
107                                 } catch (MalformedURLException e) {
108                                         throw new IllegalArgumentException(e);
109                                 }
110                         } else {
111                                 try {
112                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
113                                                         fAllowSelfSignedCerts, fMaxBatchSize);
114                                 } catch (MalformedURLException e) {
115                                         throw new IllegalArgumentException(e);
116                                 }
117                         }
118
119                 }
120
121                 private Collection<String> fUrls;
122                 private String fTopic;
123                 private int fMaxBatchSize = 100;
124                 private long fMaxBatchAgeMs = 1000;
125                 private boolean fCompress = false;
126                 private int threadOccuranceTime = 50;
127                 private boolean fAllowSelfSignedCerts = false;
128                 private boolean fWithResponse = false;
129
130         };
131
132         @Override
133         public int send(String partition, String msg) {
134                 return send(new message(partition, msg));
135         }
136
137         @Override
138         public int send(String msg) {
139                 return send(new message(null, msg));
140         }
141
142         @Override
143         public int send(message msg) {
144                 final LinkedList<message> list = new LinkedList<message>();
145                 list.add(msg);
146                 return send(list);
147         }
148
149         @Override
150         public synchronized int send(Collection<message> msgs) {
151                 if (fClosed) {
152                         throw new IllegalStateException("The publisher was closed.");
153                 }
154
155                 for (message userMsg : msgs) {
156                         fPending.add(new TimestampedMessage(userMsg));
157                 }
158                 return getPendingMessageCount();
159         }
160
161         @Override
162         public synchronized int getPendingMessageCount() {
163                 return fPending.size();
164         }
165
166         @Override
167         public void close() {
168                 try {
169                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
170                         if (remains.size() > 0) {
171                                 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
172                                                 + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
173                         }
174                 } catch (InterruptedException e) {
175                         getLog().warn("Possible message loss. " + e.getMessage(), e);
176                         Thread.currentThread().interrupt();
177                 } catch (IOException e) {
178                         getLog().warn("Possible message loss. " + e.getMessage(), e);
179                 }
180         }
181
182         @Override
183         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
184                 synchronized (this) {
185                         fClosed = true;
186
187                         // stop the background sender
188                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
189                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
190                         fExec.shutdown();
191                 }
192
193                 final long now = Clock.now();
194                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
195                 final long timeoutAtMs = now + waitInMs;
196
197                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
198                         send(true);
199                         Thread.sleep(250);
200                 }
201
202                 synchronized (this) {
203                         final LinkedList<message> result = new LinkedList<message>();
204                         fPending.drainTo(result);
205                         return result;
206                 }
207         }
208
209         /**
210          * Possibly send a batch to the MR server. This is called by the background
211          * thread and the close() method
212          * 
213          * @param force
214          */
215         private synchronized void send(boolean force) {
216                 if (force || shouldSendNow()) {
217                         if (!sendBatch()) {
218                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
219
220                                 // note the time for back-off
221                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
222                         }
223                 }
224         }
225
226         private synchronized boolean shouldSendNow() {
227                 boolean shouldSend = false;
228                 if (fPending.size() > 0) {
229                         final long nowMs = Clock.now();
230
231                         shouldSend = (fPending.size() >= fMaxBatchSize);
232                         if (!shouldSend) {
233                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
234                                 shouldSend = sendAtMs <= nowMs;
235                         }
236
237                         // however, wait after an error
238                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
239                 }
240                 return shouldSend;
241         }
242
243         /**
244          * Method to parse published JSON Objects and Arrays
245          * 
246          * @return JSONArray
247          */
248         private JSONArray parseJSON() {
249                 JSONArray jsonArray = new JSONArray();
250                 for (TimestampedMessage m : fPending) {
251                         JSONTokener jsonTokener = new JSONTokener(m.fMsg);
252                         JSONObject jsonObject = null;
253                         JSONArray tempjsonArray = null;
254                         final char firstChar = jsonTokener.next();
255                         jsonTokener.back();
256                         if ('[' == firstChar) {
257                                 tempjsonArray = new JSONArray(jsonTokener);
258                                 if (null != tempjsonArray) {
259                                         for (int i = 0; i < tempjsonArray.length(); i++) {
260                                                 jsonArray.put(tempjsonArray.getJSONObject(i));
261                                         }
262                                 }
263                         } else {
264                                 jsonObject = new JSONObject(jsonTokener);
265                                 jsonArray.put(jsonObject);
266                         }
267
268                 }
269                 return jsonArray;
270         }
271
272         private synchronized boolean sendBatch() {
273                 // it's possible for this call to be made with an empty list. in this
274                 // case, just return.
275                 if (fPending.size() < 1) {
276                         return true;
277                 }
278
279                 final long nowMs = Clock.now();
280
281                 host = this.fHostSelector.selectBaseHost();
282
283                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
284                                 props.getProperty("partition"));
285
286                 try {
287                         /*
288                          * final String contentType = fCompress ?
289                          * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
290                          */
291
292                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
293                         OutputStream os = baseStream;
294                         final String contentType = props.getProperty("contenttype");
295                         if (contentType.equalsIgnoreCase("application/json")) {
296                                 JSONArray jsonArray = parseJSON();
297                                 os.write(jsonArray.toString().getBytes());
298                                 os.close();
299
300                         } else if (contentType.equalsIgnoreCase("text/plain")) {
301                                 for (TimestampedMessage m : fPending) {
302                                         os.write(m.fMsg.getBytes());
303                                         os.write('\n');
304                                 }
305                                 os.close();
306                         } else if (contentType.equalsIgnoreCase("application/cambria")
307                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
308                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
309                                         os = new GZIPOutputStream(baseStream);
310                                 }
311                                 for (TimestampedMessage m : fPending) {
312
313                                         os.write(("" + m.fPartition.length()).getBytes());
314                                         os.write('.');
315                                         os.write(("" + m.fMsg.length()).getBytes());
316                                         os.write('.');
317                                         os.write(m.fPartition.getBytes());
318                                         os.write(m.fMsg.getBytes());
319                                         os.write('\n');
320                                 }
321                                 os.close();
322                         } else {
323                                 for (TimestampedMessage m : fPending) {
324                                         os.write(m.fMsg.getBytes());
325
326                                 }
327                                 os.close();
328                         }
329
330                         final long startMs = Clock.now();
331                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
332
333                                 DME2Configue();
334
335                                 Thread.sleep(5);
336                                 getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
337                                                 + (nowMs - fPending.peek().timestamp) + " ms");
338                                 sender.setPayload(os.toString());
339                                 String dmeResponse = sender.sendAndWait(5000L);
340
341                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
342                                 getLog().info(logLine);
343                                 fPending.clear();
344                                 return true;
345                         }
346
347                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
348                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
349                                                 + (nowMs - fPending.peek().timestamp) + " ms");
350                                 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
351                                                 username, password, protocolFlag);
352                                 // Here we are checking for error response. If HTTP status
353                                 // code is not within the http success response code
354                                 // then we consider this as error and return false
355                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
356                                         return false;
357                                 }
358                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
359                                 getLog().info(logLine);
360                                 fPending.clear();
361                                 return true;
362                         }
363
364                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
365                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
366                                                 + (nowMs - fPending.peek().timestamp) + " ms");
367                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
368                                                 protocolFlag);
369
370                                 // Here we are checking for error response. If HTTP status
371                                 // code is not within the http success response code
372                                 // then we consider this as error and return false
373                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
374                                         return false;
375                                 }
376                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
377                                 getLog().info(logLine);
378                                 fPending.clear();
379                                 return true;
380                         }
381                         
382                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
383                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
384                                                 + (nowMs - fPending.peek().timestamp) + " ms");
385                                 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
386
387                                 // Here we are checking for error response. If HTTP status
388                                 // code is not within the http success response code
389                                 // then we consider this as error and return false
390                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
391                                         return false;
392                                 }
393                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
394                                 getLog().info(logLine);
395                                 fPending.clear();
396                                 return true;
397                         }
398                 } catch (IllegalArgumentException x) {
399                         getLog().warn(x.getMessage(), x);
400                 } catch (IOException x) {
401                         getLog().warn(x.getMessage(), x);
402                 } catch (HttpException x) {
403                         getLog().warn(x.getMessage(), x);
404                 } catch (Exception x) {
405                         getLog().warn(x.getMessage(), x);
406                 }
407                 return false;
408         }
409
410         public synchronized MRPublisherResponse sendBatchWithResponse() {
411                 // it's possible for this call to be made with an empty list. in this
412                 // case, just return.
413                 if (fPending.size() < 1) {
414                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
415                         pubResponse.setResponseMessage("No Messages to send");
416                         return pubResponse;
417                 }
418
419                 final long nowMs = Clock.now();
420
421                 host = this.fHostSelector.selectBaseHost();
422
423                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
424                                 props.getProperty("partition"));
425                 OutputStream os = null;
426                 try {
427
428                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
429                         os = baseStream;
430                         final String contentType = props.getProperty("contenttype");
431                         if (contentType.equalsIgnoreCase("application/json")) {
432                                 JSONArray jsonArray = parseJSON();
433                                 os.write(jsonArray.toString().getBytes());
434                         } else if (contentType.equalsIgnoreCase("text/plain")) {
435                                 for (TimestampedMessage m : fPending) {
436                                         os.write(m.fMsg.getBytes());
437                                         os.write('\n');
438                                 }
439                         } else if (contentType.equalsIgnoreCase("application/cambria")
440                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
441                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
442                                         os = new GZIPOutputStream(baseStream);
443                                 }
444                                 for (TimestampedMessage m : fPending) {
445
446                                         os.write(("" + m.fPartition.length()).getBytes());
447                                         os.write('.');
448                                         os.write(("" + m.fMsg.length()).getBytes());
449                                         os.write('.');
450                                         os.write(m.fPartition.getBytes());
451                                         os.write(m.fMsg.getBytes());
452                                         os.write('\n');
453                                 }
454                                 os.close();
455                         } else {
456                                 for (TimestampedMessage m : fPending) {
457                                         os.write(m.fMsg.getBytes());
458
459                                 }
460                         }
461
462                         final long startMs = Clock.now();
463                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
464
465                                 try {
466                                         DME2Configue();
467
468                                         Thread.sleep(5);
469                                         getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
470                                                         + (nowMs - fPending.peek().timestamp) + " ms");
471                                         sender.setPayload(os.toString());
472
473                                         String dmeResponse = sender.sendAndWait(5000L);
474
475                                         pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
476
477                                         if (Integer.valueOf(pubResponse.getResponseCode()) < 200
478                                                         || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
479
480                                                 return pubResponse;
481                                         }
482                                         final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
483                                         getLog().info(logLine);
484                                         fPending.clear();
485
486                                 } catch (DME2Exception x) {
487                                         getLog().warn(x.getMessage(), x);
488                                         pubResponse.setResponseCode(x.getErrorCode());
489                                         pubResponse.setResponseMessage(x.getErrorMessage());
490                                 } catch (URISyntaxException x) {
491
492                                         getLog().warn(x.getMessage(), x);
493                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
494                                         pubResponse.setResponseMessage(x.getMessage());
495                                 } catch (Exception x) {
496
497                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
498                                         pubResponse.setResponseMessage(x.getMessage());
499                                         logger.error("exception: ", x);
500
501                                 }
502
503                                 return pubResponse;
504                         }
505
506                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
507                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
508                                                 + (nowMs - fPending.peek().timestamp) + " ms");
509                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
510                                                 authDate, username, password, protocolFlag);
511                                 // Here we are checking for error response. If HTTP status
512                                 // code is not within the http success response code
513                                 // then we consider this as error and return false
514
515                                 pubResponse = createMRPublisherResponse(result, pubResponse);
516
517                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
518                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
519
520                                         return pubResponse;
521                                 }
522
523                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
524                                 getLog().info(logLine);
525                                 fPending.clear();
526                                 return pubResponse;
527                         }
528
529                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
530                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
531                                                 + (nowMs - fPending.peek().timestamp) + " ms");
532                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
533                                                 password, protocolFlag);
534
535                                 // Here we are checking for error response. If HTTP status
536                                 // code is not within the http success response code
537                                 // then we consider this as error and return false
538                                 pubResponse = createMRPublisherResponse(result, pubResponse);
539
540                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
541                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
542
543                                         return pubResponse;
544                                 }
545
546                                 final String logLine = String.valueOf((Clock.now() - startMs));
547                                 getLog().info(logLine);
548                                 fPending.clear();
549                                 return pubResponse;
550                         }
551                         
552                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
553                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
554                                                 + (nowMs - fPending.peek().timestamp) + " ms");
555                                 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
556
557                                 // Here we are checking for error response. If HTTP status
558                                 // code is not within the http success response code
559                                 // then we consider this as error and return false
560                                 pubResponse = createMRPublisherResponse(result, pubResponse);
561
562                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
563                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
564
565                                         return pubResponse;
566                                 }
567
568                                 final String logLine = String.valueOf((Clock.now() - startMs));
569                                 getLog().info(logLine);
570                                 fPending.clear();
571                                 return pubResponse;
572                         }
573                 } catch (IllegalArgumentException x) {
574                         getLog().warn(x.getMessage(), x);
575                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
576                         pubResponse.setResponseMessage(x.getMessage());
577
578                 } catch (IOException x) {
579                         getLog().warn(x.getMessage(), x);
580                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
581                         pubResponse.setResponseMessage(x.getMessage());
582
583                 } catch (HttpException x) {
584                         getLog().warn(x.getMessage(), x);
585                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
586                         pubResponse.setResponseMessage(x.getMessage());
587
588                 } catch (Exception x) {
589                         getLog().warn(x.getMessage(), x);
590
591                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
592                         pubResponse.setResponseMessage(x.getMessage());
593
594                 }
595
596                 finally {
597                         if (fPending.size() > 0) {
598                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
599                                 pubResponse.setPendingMsgs(fPending.size());
600                         }
601                         if (os != null) {
602                                 try {
603                                         os.close();
604                                 } catch (Exception x) {
605                                         getLog().warn(x.getMessage(), x);
606                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
607                                         pubResponse.setResponseMessage("Error in closing Output Stream");
608                                 }
609                         }
610                 }
611
612                 return pubResponse;
613         }
614
615         public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
616
617                 if (reply.isEmpty()) {
618
619                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
620                         mrPubResponse.setResponseMessage("Please verify the Producer properties");
621                 } else if (reply.startsWith("{")) {
622                         JSONObject jObject = new JSONObject(reply);
623                         if (jObject.has("message") && jObject.has("status")) {
624                                 String message = jObject.getString("message");
625                                 if (null != message) {
626                                         mrPubResponse.setResponseMessage(message);
627                                 }
628                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
629                         } else {
630                                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
631                                 mrPubResponse.setResponseMessage(reply);
632                         }
633                 } else if (reply.startsWith("<")) {
634                         String responseCode = getHTTPErrorResponseCode(reply);
635                         if (responseCode.contains("403")) {
636                                 responseCode = "403";
637                         }
638                         mrPubResponse.setResponseCode(responseCode);
639                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
640                 }
641
642                 return mrPubResponse;
643         }
644
645         private final String fTopic;
646         private final int fMaxBatchSize;
647         private final long fMaxBatchAgeMs;
648         private final boolean fCompress;
649         private int threadOccuranceTime;
650         private boolean fClosed;
651         private String username;
652         private String password;
653         private String host;
654
655         // host selector
656         private HostSelector fHostSelector = null;
657
658         private final LinkedBlockingQueue<TimestampedMessage> fPending;
659         private long fDontSendUntilMs;
660         private final ScheduledThreadPoolExecutor fExec;
661
662         private String latitude;
663         private String longitude;
664         private String version;
665         private String serviceName;
666         private String env;
667         private String partner;
668         private String routeOffer;
669         private String subContextPath;
670         private String protocol;
671         private String methodType;
672         private String url;
673         private String dmeuser;
674         private String dmepassword;
675         private String contentType;
676         private static final long sfWaitAfterError = 10000;
677         private HashMap<String, String> DMETimeOuts;
678         private DME2Client sender;
679         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
680         private String authKey;
681         private String authDate;
682         private String handlers;
683         private Properties props;
684         public static String routerFilePath;
685         protected static final Map<String, String> headers = new HashMap<String, String>();
686         public static MultivaluedMap<String, Object> headersMap;
687
688         private MRPublisherResponse pubResponse;
689
690         public MRPublisherResponse getPubResponse() {
691                 return pubResponse;
692         }
693
694         public void setPubResponse(MRPublisherResponse pubResponse) {
695                 this.pubResponse = pubResponse;
696         }
697
698         public static String getRouterFilePath() {
699                 return routerFilePath;
700         }
701
702         public static void setRouterFilePath(String routerFilePath) {
703                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
704         }
705
706         public Properties getProps() {
707                 return props;
708         }
709
710         public void setProps(Properties props) {
711                 this.props = props;
712         }
713
714         public String getProtocolFlag() {
715                 return protocolFlag;
716         }
717
718         public void setProtocolFlag(String protocolFlag) {
719                 this.protocolFlag = protocolFlag;
720         }
721
722         private void DME2Configue() throws Exception {
723                 try {
724
725                         /*
726                          * FileReader reader = new FileReader(new File (producerFilePath));
727                          * Properties props = new Properties(); props.load(reader);
728                          */
729                         latitude = props.getProperty("Latitude");
730                         longitude = props.getProperty("Longitude");
731                         version = props.getProperty("Version");
732                         serviceName = props.getProperty("ServiceName");
733                         env = props.getProperty("Environment");
734                         partner = props.getProperty("Partner");
735                         routeOffer = props.getProperty("routeOffer");
736                         subContextPath = props.getProperty("SubContextPath") + fTopic;
737                         /*
738                          * if(props.getProperty("partition")!=null &&
739                          * !props.getProperty("partition").equalsIgnoreCase("")){
740                          * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
741                          * "partition"); }
742                          */
743                         protocol = props.getProperty("Protocol");
744                         methodType = props.getProperty("MethodType");
745                         dmeuser = props.getProperty("username");
746                         dmepassword = props.getProperty("password");
747                         contentType = props.getProperty("contenttype");
748                         handlers = props.getProperty("sessionstickinessrequired");
749                         routerFilePath = props.getProperty("DME2preferredRouterFilePath");
750
751                         /**
752                          * Changes to DME2Client url to use Partner for auto failover
753                          * between data centers When Partner value is not provided use the
754                          * routeOffer value for auto failover within a cluster
755                          */
756
757                         String partitionKey = props.getProperty("partition");
758
759                         if (partner != null && !partner.isEmpty()) {
760                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
761                                                 + partner;
762                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
763                                         url = url + "&partitionKey=" + partitionKey;
764                                 }
765                         } else if (routeOffer != null && !routeOffer.isEmpty()) {
766                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
767                                                 + routeOffer;
768                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
769                                         url = url + "&partitionKey=" + partitionKey;
770                                 }
771                         }
772
773                         DMETimeOuts = new HashMap<String, String>();
774                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
775                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
776                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
777                         DMETimeOuts.put("Content-Type", contentType);
778                         System.setProperty("AFT_LATITUDE", latitude);
779                         System.setProperty("AFT_LONGITUDE", longitude);
780                         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
781                         // System.setProperty("DME2.DEBUG", "true");
782
783                         // SSL changes
784                         // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
785                         // "SSLv3,TLSv1,TLSv1.1");
786                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
787                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
788                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
789
790                         // SSL changes
791
792                         sender = new DME2Client(new URI(url), 5000L);
793
794                         sender.setAllowAllHttpReturnCodes(true);
795                         sender.setMethod(methodType);
796                         sender.setSubContext(subContextPath);
797                         sender.setCredentials(dmeuser, dmepassword);
798                         sender.setHeaders(DMETimeOuts);
799                         if (handlers.equalsIgnoreCase("yes")) {
800                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
801                                                 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
802                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
803                                                 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
804                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
805                         } else {
806                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
807                         }
808                 } catch (DME2Exception x) {
809                         getLog().warn(x.getMessage(), x);
810                         throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
811                 } catch (URISyntaxException x) {
812
813                         getLog().warn(x.getMessage(), x);
814                         throw new URISyntaxException(url, x.getMessage());
815                 } catch (Exception x) {
816
817                         getLog().warn(x.getMessage(), x);
818                         throw new IllegalArgumentException(x.getMessage());
819                 }
820         }
821
822         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
823                         boolean compress) throws MalformedURLException {
824                 super(hosts);
825
826                 if (topic == null || topic.length() < 1) {
827                         throw new IllegalArgumentException("A topic must be provided.");
828                 }
829
830                 fHostSelector = new HostSelector(hosts, null);
831                 fClosed = false;
832                 fTopic = topic;
833                 fMaxBatchSize = maxBatchSize;
834                 fMaxBatchAgeMs = maxBatchAgeMs;
835                 fCompress = compress;
836
837                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
838                 fDontSendUntilMs = 0;
839                 fExec = new ScheduledThreadPoolExecutor(1);
840                 pubResponse = new MRPublisherResponse();
841
842         }
843
844         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
845                         boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
846                 super(hosts);
847
848                 if (topic == null || topic.length() < 1) {
849                         throw new IllegalArgumentException("A topic must be provided.");
850                 }
851
852                 fHostSelector = new HostSelector(hosts, null);
853                 fClosed = false;
854                 fTopic = topic;
855                 fMaxBatchSize = maxBatchSize;
856                 fMaxBatchAgeMs = maxBatchAgeMs;
857                 fCompress = compress;
858                 threadOccuranceTime = httpThreadOccurnace;
859                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
860                 fDontSendUntilMs = 0;
861                 fExec = new ScheduledThreadPoolExecutor(1);
862                 fExec.scheduleAtFixedRate(new Runnable() {
863                         @Override
864                         public void run() {
865                                 send(false);
866                         }
867                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
868         }
869
870         private static class TimestampedMessage extends message {
871                 public TimestampedMessage(message m) {
872                         super(m);
873                         timestamp = Clock.now();
874                 }
875
876                 public final long timestamp;
877         }
878
879         public String getUsername() {
880                 return username;
881         }
882
883         public void setUsername(String username) {
884                 this.username = username;
885         }
886
887         public String getPassword() {
888                 return password;
889         }
890
891         public void setPassword(String password) {
892                 this.password = password;
893         }
894
895         public String getHost() {
896                 return host;
897         }
898
899         public void setHost(String host) {
900                 this.host = host;
901         }
902
903         public String getContentType() {
904                 return contentType;
905         }
906
907         public void setContentType(String contentType) {
908                 this.contentType = contentType;
909         }
910
911         public String getAuthKey() {
912                 return authKey;
913         }
914
915         public void setAuthKey(String authKey) {
916                 this.authKey = authKey;
917         }
918
919         public String getAuthDate() {
920                 return authDate;
921         }
922
923         public void setAuthDate(String authDate) {
924                 this.authDate = authDate;
925         }
926
927 }