8f2a66e156f976f060baf5405456d73178569b52
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import org.onap.dmaap.mr.client.HostSelector;
55 import org.onap.dmaap.mr.client.MRBatchingPublisher;
56 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
57 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
58
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60         private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62         public static class Builder {
63                 public Builder() {
64                 }
65
66                 public Builder againstUrls(Collection<String> baseUrls) {
67                         fUrls = baseUrls;
68                         return this;
69                 }
70                 
71                 public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )            
72                 {               
73                         fUrls = baseUrls;               
74                         fServiceName = serviceName;             
75                         fTransportype = transportype;           
76                         return this;            
77                 }
78
79                 public Builder onTopic(String topic) {
80                         fTopic = topic;
81                         return this;
82                 }
83
84                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
85                         fMaxBatchSize = maxBatchSize;
86                         fMaxBatchAgeMs = maxBatchAgeMs;
87                         return this;
88                 }
89
90                 public Builder compress(boolean compress) {
91                         fCompress = compress;
92                         return this;
93                 }
94
95                 public Builder httpThreadTime(int threadOccuranceTime) {
96                         this.threadOccuranceTime = threadOccuranceTime;
97                         return this;
98                 }
99
100                 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
101                         fAllowSelfSignedCerts = allowSelfSignedCerts;
102                         return this;
103                 }
104
105                 public Builder withResponse(boolean withResponse) {
106                         fWithResponse = withResponse;
107                         return this;
108                 }
109
110                 public MRSimplerBatchPublisher build() {
111                         if (!fWithResponse) {
112                                 try {
113                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
114                                                         fAllowSelfSignedCerts, threadOccuranceTime);
115                                 } catch (MalformedURLException e) {
116                                         throw new IllegalArgumentException(e);
117                                 }
118                         } else {
119                                 try {
120                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
121                                                         fAllowSelfSignedCerts, fMaxBatchSize);
122                                 } catch (MalformedURLException e) {
123                                         throw new IllegalArgumentException(e);
124                                 }
125                         }
126
127                 }
128
129                 private Collection<String> fUrls;
130                 private Collection<String> fServiceName;                
131                 private String fTransportype;   
132                 private String fTopic;
133                 private int fMaxBatchSize = 100;
134                 
135                 private long fMaxBatchAgeMs = 1000;
136                 private boolean fCompress = false;
137                 private int threadOccuranceTime = 50;
138                 private boolean fAllowSelfSignedCerts = false;
139                 private boolean fWithResponse = false;
140
141         };
142
143         @Override
144         public int send(String partition, String msg) {
145                 return send(new message(partition, msg));
146         }
147
148         @Override
149         public int send(String msg) {
150                 return send(new message(null, msg));
151         }
152
153         @Override
154         public int send(message msg) {
155                 final LinkedList<message> list = new LinkedList<>();
156                 list.add(msg);
157                 return send(list);
158         }
159
160         @Override
161         public synchronized int send(Collection<message> msgs) {
162                 if (fClosed) {
163                         throw new IllegalStateException("The publisher was closed.");
164                 }
165
166                 for (message userMsg : msgs) {
167                         fPending.add(new TimestampedMessage(userMsg));
168                 }
169                 return getPendingMessageCount();
170         }
171
172         @Override
173         public synchronized int getPendingMessageCount() {
174                 return fPending.size();
175         }
176
177         @Override
178         public void close() {
179                 try {
180                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
181                         if (remains.isEmpty()) {
182                                 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
183                                         remains.size());
184                         }
185                 } catch (InterruptedException e) {
186                         getLog().warn("Possible message loss. " + e.getMessage(), e);
187                         Thread.currentThread().interrupt();
188                 } catch (IOException e) {
189                         getLog().warn("Possible message loss. " + e.getMessage(), e);
190                 }
191         }
192
193         @Override
194         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
195                 synchronized (this) {
196                         fClosed = true;
197
198                         // stop the background sender
199                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
200                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
201                         fExec.shutdown();
202                 }
203
204                 final long now = Clock.now();
205                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
206                 final long timeoutAtMs = now + waitInMs;
207
208                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
209                         send(true);
210                         Thread.sleep(250);
211                 }
212
213                 synchronized (this) {
214                         final LinkedList<message> result = new LinkedList<>();
215                         fPending.drainTo(result);
216                         return result;
217                 }
218         }
219
220         /**
221          * Possibly send a batch to the MR server. This is called by the background
222          * thread and the close() method
223          * 
224          * @param force
225          */
226         private synchronized void send(boolean force) {
227                 if ((force || shouldSendNow()) && !sendBatch()) {
228                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
229
230                                 // note the time for back-off
231                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
232                 }
233         }
234
235         private synchronized boolean shouldSendNow() {
236                 boolean shouldSend = false;
237                 if (fPending.size()>0) {
238                         final long nowMs = Clock.now();
239
240                         shouldSend = (fPending.size() >= fMaxBatchSize);
241                         if (!shouldSend) {
242                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
243                                 shouldSend = sendAtMs <= nowMs;
244                         }
245
246                         // however, wait after an error
247                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
248                 }
249                 return shouldSend;
250         }
251
252         /**
253          * Method to parse published JSON Objects and Arrays
254          * 
255          * @return JSONArray
256          */
257         private JSONArray parseJSON() {
258                 JSONArray jsonArray = new JSONArray();
259                 for (TimestampedMessage m : fPending) {
260                         JSONTokener jsonTokener = new JSONTokener(m.fMsg);
261                         JSONObject jsonObject = null;
262                         JSONArray tempjsonArray = null;
263                         final char firstChar = jsonTokener.next();
264                         jsonTokener.back();
265                         if ('[' == firstChar) {
266                                 tempjsonArray = new JSONArray(jsonTokener);
267                                 if (null != tempjsonArray) {
268                                         for (int i = 0; i < tempjsonArray.length(); i++) {
269                                                 jsonArray.put(tempjsonArray.getJSONObject(i));
270                                         }
271                                 }
272                         } else {
273                                 jsonObject = new JSONObject(jsonTokener);
274                                 jsonArray.put(jsonObject);
275                         }
276
277                 }
278                 return jsonArray;
279         }
280
281         private synchronized boolean sendBatch() {
282                 // it's possible for this call to be made with an empty list. in this
283                 // case, just return.
284                 if (fPending.isEmpty()) {
285                         return true;
286                 }
287
288                 final long nowMs = Clock.now();
289
290                 if (this.fHostSelector != null) {
291                         host = this.fHostSelector.selectBaseHost();
292                 }
293
294                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
295                                 props.getProperty("partition"));
296
297                 try {
298                         
299                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
300                         OutputStream os = baseStream;
301                         final String contentType = props.getProperty("contenttype");
302                         if (contentType.equalsIgnoreCase("application/json")) {
303                                 JSONArray jsonArray = parseJSON();
304                                 os.write(jsonArray.toString().getBytes());
305                                 os.close();
306
307                         } else if (contentType.equalsIgnoreCase("text/plain")) {
308                                 for (TimestampedMessage m : fPending) {
309                                         os.write(m.fMsg.getBytes());
310                                         os.write('\n');
311                                 }
312                                 os.close();
313                         } else if (contentType.equalsIgnoreCase("application/cambria")
314                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
315                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
316                                         os = new GZIPOutputStream(baseStream);
317                                 }
318                                 for (TimestampedMessage m : fPending) {
319
320                                         os.write(("" + m.fPartition.length()).getBytes());
321                                         os.write('.');
322                                         os.write(("" + m.fMsg.length()).getBytes());
323                                         os.write('.');
324                                         os.write(m.fPartition.getBytes());
325                                         os.write(m.fMsg.getBytes());
326                                         os.write('\n');
327                                 }
328                                 os.close();
329                         } else {
330                                 for (TimestampedMessage m : fPending) {
331                                         os.write(m.fMsg.getBytes());
332
333                                 }
334                                 os.close();
335                         }
336
337                         final long startMs = Clock.now();
338                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
339
340                                 DME2Configue();
341
342                                 Thread.sleep(5);
343                                 getLog().info(String
344                                         .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
345                                                 nowMs - fPending.peek().timestamp));
346                                 sender.setPayload(os.toString());
347                                 String dmeResponse = sender.sendAndWait(5000L);
348
349                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
350                                 getLog().info(logLine);
351                                 fPending.clear();
352                                 return true;
353                         }
354
355             if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
356                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
357                                         nowMs - fPending.peek().timestamp);
358                 final JSONObject result =
359                         postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
360                                 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
361                                 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
362                 // Here we are checking for error response. If HTTP status
363                 // code is not within the http success response code
364                 // then we consider this as error and return false
365                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
366                     return false;
367                 }
368                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
369                 getLog().info(logLine);
370                 fPending.clear();
371                 return true;
372             }
373
374                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
375                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
376                                         nowMs - fPending.peek().timestamp);
377                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
378                                                 protocolFlag);
379
380                                 // Here we are checking for error response. If HTTP status
381                                 // code is not within the http success response code
382                                 // then we consider this as error and return false
383                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
384                                         return false;
385                                 }
386                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
387                                 getLog().info(logLine);
388                                 fPending.clear();
389                                 return true;
390                         }
391                         
392                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
393                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
394                                         nowMs - fPending.peek().timestamp);
395                                 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
396
397                                 // Here we are checking for error response. If HTTP status
398                                 // code is not within the http success response code
399                                 // then we consider this as error and return false
400                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
401                                         return false;
402                                 }
403                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
404                                 getLog().info(logLine);
405                                 fPending.clear();
406                                 return true;
407                         }
408                 } catch (Exception x) {
409                         getLog().warn(x.getMessage(), x);
410                 }
411                 return false;
412         }
413
414         public synchronized MRPublisherResponse sendBatchWithResponse() {
415                 // it's possible for this call to be made with an empty list. in this
416                 // case, just return.
417                 if (fPending.isEmpty()) {
418                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
419                         pubResponse.setResponseMessage("No Messages to send");
420                         return pubResponse;
421                 }
422
423                 final long nowMs = Clock.now();
424
425                 host = this.fHostSelector.selectBaseHost();
426
427                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
428                                 props.getProperty("partition"));
429                 OutputStream os = null;
430                 try {
431
432                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
433                         os = baseStream;
434                         final String contentType = props.getProperty("contenttype");
435                         if (contentType.equalsIgnoreCase("application/json")) {
436                                 JSONArray jsonArray = parseJSON();
437                                 os.write(jsonArray.toString().getBytes());
438                         } else if (contentType.equalsIgnoreCase("text/plain")) {
439                                 for (TimestampedMessage m : fPending) {
440                                         os.write(m.fMsg.getBytes());
441                                         os.write('\n');
442                                 }
443                         } else if (contentType.equalsIgnoreCase("application/cambria")
444                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
445                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
446                                         os = new GZIPOutputStream(baseStream);
447                                 }
448                                 for (TimestampedMessage m : fPending) {
449
450                                         os.write(("" + m.fPartition.length()).getBytes());
451                                         os.write('.');
452                                         os.write(("" + m.fMsg.length()).getBytes());
453                                         os.write('.');
454                                         os.write(m.fPartition.getBytes());
455                                         os.write(m.fMsg.getBytes());
456                                         os.write('\n');
457                                 }
458                                 os.close();
459                         } else {
460                                 for (TimestampedMessage m : fPending) {
461                                         os.write(m.fMsg.getBytes());
462
463                                 }
464                         }
465
466                         final long startMs = Clock.now();
467                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
468
469                                 try {
470                                         DME2Configue();
471
472                                         Thread.sleep(5);
473                                         getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
474                                                         nowMs - fPending.peek().timestamp);
475                                         sender.setPayload(os.toString());
476
477                                         String dmeResponse = sender.sendAndWait(5000L);
478
479                                         pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
480
481                                         if (Integer.valueOf(pubResponse.getResponseCode()) < 200
482                                                         || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
483
484                                                 return pubResponse;
485                                         }
486                                         final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
487                                         getLog().info(logLine);
488                                         fPending.clear();
489
490                                 } catch (DME2Exception x) {
491                                         getLog().warn(x.getMessage(), x);
492                                         pubResponse.setResponseCode(x.getErrorCode());
493                                         pubResponse.setResponseMessage(x.getErrorMessage());
494                                 } catch (URISyntaxException x) {
495
496                                         getLog().warn(x.getMessage(), x);
497                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
498                                         pubResponse.setResponseMessage(x.getMessage());
499                                 } catch (Exception x) {
500
501                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
502                                         pubResponse.setResponseMessage(x.getMessage());
503                                         logger.error("exception: ", x);
504
505                                 }
506
507                                 return pubResponse;
508                         }
509
510                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
511                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
512                                         nowMs - fPending.peek().timestamp);
513                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
514                                                 authDate, username, password, protocolFlag);
515                                 // Here we are checking for error response. If HTTP status
516                                 // code is not within the http success response code
517                                 // then we consider this as error and return false
518
519                                 pubResponse = createMRPublisherResponse(result, pubResponse);
520
521                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
522                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
523
524                                         return pubResponse;
525                                 }
526
527                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
528                                 getLog().info(logLine);
529                                 fPending.clear();
530                                 return pubResponse;
531                         }
532
533                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
534                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
535                                         nowMs - fPending.peek().timestamp);
536                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
537                                                 password, protocolFlag);
538
539                                 // Here we are checking for error response. If HTTP status
540                                 // code is not within the http success response code
541                                 // then we consider this as error and return false
542                                 pubResponse = createMRPublisherResponse(result, pubResponse);
543
544                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
545                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
546
547                                         return pubResponse;
548                                 }
549
550                                 final String logLine = String.valueOf((Clock.now() - startMs));
551                                 getLog().info(logLine);
552                                 fPending.clear();
553                                 return pubResponse;
554                         }
555                         
556                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
557                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
558                                         nowMs - fPending.peek().timestamp);
559                                 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
560
561                                 // Here we are checking for error response. If HTTP status
562                                 // code is not within the http success response code
563                                 // then we consider this as error and return false
564                                 pubResponse = createMRPublisherResponse(result, pubResponse);
565
566                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
567                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
568
569                                         return pubResponse;
570                                 }
571
572                                 final String logLine = String.valueOf((Clock.now() - startMs));
573                                 getLog().info(logLine);
574                                 fPending.clear();
575                                 return pubResponse;
576                         }
577                 } catch (IllegalArgumentException | HttpException x) {
578                         getLog().warn(x.getMessage(), x);
579                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
580                         pubResponse.setResponseMessage(x.getMessage());
581
582                 } catch (IOException x) {
583                         getLog().warn(x.getMessage(), x);
584                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
585                         pubResponse.setResponseMessage(x.getMessage());
586                 } catch (Exception x) {
587                         getLog().warn(x.getMessage(), x);
588
589                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
590                         pubResponse.setResponseMessage(x.getMessage());
591
592                 }
593
594                 finally {
595                         if (!fPending.isEmpty()) {
596                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
597                                 pubResponse.setPendingMsgs(fPending.size());
598                         }
599                         if (os != null) {
600                                 try {
601                                         os.close();
602                                 } catch (Exception x) {
603                                         getLog().warn(x.getMessage(), x);
604                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
605                                         pubResponse.setResponseMessage("Error in closing Output Stream");
606                                 }
607                         }
608                 }
609
610                 return pubResponse;
611         }
612
613         public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
614
615                 if (reply.isEmpty()) {
616
617                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
618                         mrPubResponse.setResponseMessage("Please verify the Producer properties");
619                 } else if (reply.startsWith("{")) {
620                         JSONObject jObject = new JSONObject(reply);
621                         if (jObject.has("message") && jObject.has("status")) {
622                                 String message = jObject.getString("message");
623                                 if (null != message) {
624                                         mrPubResponse.setResponseMessage(message);
625                                 }
626                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
627                         } else {
628                                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
629                                 mrPubResponse.setResponseMessage(reply);
630                         }
631                 } else if (reply.startsWith("<")) {
632                         String responseCode = getHTTPErrorResponseCode(reply);
633                         if (responseCode.contains("403")) {
634                                 responseCode = "403";
635                         }
636                         mrPubResponse.setResponseCode(responseCode);
637                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
638                 }
639
640                 return mrPubResponse;
641         }
642
643         private final String fTopic;
644         private final int fMaxBatchSize;
645         private final long fMaxBatchAgeMs;
646         private final boolean fCompress;
647         private int threadOccuranceTime;
648         private boolean fClosed;
649         private String username;
650         private String password;
651         private String host;
652
653         // host selector
654         private HostSelector fHostSelector = null;
655
656         private final LinkedBlockingQueue<TimestampedMessage> fPending;
657         private long fDontSendUntilMs;
658         private final ScheduledThreadPoolExecutor fExec;
659
660         private String latitude;
661         private String longitude;
662         private String version;
663         private String serviceName;
664         private String env;
665         private String partner;
666         private String routeOffer;
667         private String subContextPath;
668         private String protocol;
669         private String methodType;
670         private String url;
671         private String dmeuser;
672         private String dmepassword;
673         private String contentType;
674         private static final long sfWaitAfterError = 10000;
675         private HashMap<String, String> DMETimeOuts;
676         private DME2Client sender;
677         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
678         private String authKey;
679         private String authDate;
680         private String handlers;
681         private Properties props;
682         public static String routerFilePath;
683         protected static final Map<String, String> headers = new HashMap<String, String>();
684         public static MultivaluedMap<String, Object> headersMap;
685
686         private MRPublisherResponse pubResponse;
687
688         public MRPublisherResponse getPubResponse() {
689                 return pubResponse;
690         }
691
692         public void setPubResponse(MRPublisherResponse pubResponse) {
693                 this.pubResponse = pubResponse;
694         }
695
696         public static String getRouterFilePath() {
697                 return routerFilePath;
698         }
699
700         public static void setRouterFilePath(String routerFilePath) {
701                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
702         }
703
704         public Properties getProps() {
705                 return props;
706         }
707
708         public void setProps(Properties props) {
709                 this.props = props;
710         }
711
712         public String getProtocolFlag() {
713                 return protocolFlag;
714         }
715
716         public void setProtocolFlag(String protocolFlag) {
717                 this.protocolFlag = protocolFlag;
718         }
719
720         private void DME2Configue() throws Exception {
721                 try {
722
723                         latitude = props.getProperty("Latitude");
724                         longitude = props.getProperty("Longitude");
725                         version = props.getProperty("Version");
726                         serviceName = props.getProperty("ServiceName");
727                         env = props.getProperty("Environment");
728                         partner = props.getProperty("Partner");
729                         routeOffer = props.getProperty("routeOffer");
730                         subContextPath = props.getProperty("SubContextPath") + fTopic;
731                         
732                         protocol = props.getProperty("Protocol");
733                         methodType = props.getProperty("MethodType");
734                         dmeuser = props.getProperty("username");
735                         dmepassword = props.getProperty("password");
736                         contentType = props.getProperty("contenttype");
737                         handlers = props.getProperty("sessionstickinessrequired");
738                         routerFilePath = props.getProperty("DME2preferredRouterFilePath");
739
740                         /**
741                          * Changes to DME2Client url to use Partner for auto failover
742                          * between data centers When Partner value is not provided use the
743                          * routeOffer value for auto failover within a cluster
744                          */
745
746                         String partitionKey = props.getProperty("partition");
747
748                         if (partner != null && !partner.isEmpty()) {
749                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
750                                                 + partner;
751                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
752                                         url = url + "&partitionKey=" + partitionKey;
753                                 }
754                         } else if (routeOffer != null && !routeOffer.isEmpty()) {
755                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
756                                                 + routeOffer;
757                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
758                                         url = url + "&partitionKey=" + partitionKey;
759                                 }
760                         }
761
762                         DMETimeOuts = new HashMap<String, String>();
763                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
764                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
765                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
766                         DMETimeOuts.put("Content-Type", contentType);
767                         System.setProperty("AFT_LATITUDE", latitude);
768                         System.setProperty("AFT_LONGITUDE", longitude);
769                         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
770                         // System.setProperty("DME2.DEBUG", "true");
771
772                         // SSL changes
773                         // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
774                         
775                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
776                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
777                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
778
779                         // SSL changes
780
781                         sender = new DME2Client(new URI(url), 5000L);
782
783                         sender.setAllowAllHttpReturnCodes(true);
784                         sender.setMethod(methodType);
785                         sender.setSubContext(subContextPath);
786                         sender.setCredentials(dmeuser, dmepassword);
787                         sender.setHeaders(DMETimeOuts);
788                         if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
789                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
790                                                 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
791                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
792                                                 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
793                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
794                         } else {
795                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
796                         }
797                 } catch (DME2Exception x) {
798                         getLog().warn(x.getMessage(), x);
799                         throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
800                 } catch (URISyntaxException x) {
801
802                         getLog().warn(x.getMessage(), x);
803                         throw new URISyntaxException(url, x.getMessage());
804                 } catch (Exception x) {
805
806                         getLog().warn(x.getMessage(), x);
807                         throw new IllegalArgumentException(x.getMessage());
808                 }
809         }
810
811         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
812                         boolean compress) throws MalformedURLException {
813                 super(hosts);
814
815                 if (topic == null || topic.length() < 1) {
816                         throw new IllegalArgumentException("A topic must be provided.");
817                 }
818
819                 fHostSelector = new HostSelector(hosts, null);
820                 fClosed = false;
821                 fTopic = topic;
822                 fMaxBatchSize = maxBatchSize;
823                 fMaxBatchAgeMs = maxBatchAgeMs;
824                 fCompress = compress;
825
826                 fPending = new LinkedBlockingQueue<>();
827                 fDontSendUntilMs = 0;
828                 fExec = new ScheduledThreadPoolExecutor(1);
829                 pubResponse = new MRPublisherResponse();
830
831         }
832
833         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
834                         boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
835                 super(hosts);
836
837                 if (topic == null || topic.length() < 1) {
838                         throw new IllegalArgumentException("A topic must be provided.");
839                 }
840
841                 fHostSelector = new HostSelector(hosts, null);
842                 fClosed = false;
843                 fTopic = topic;
844                 fMaxBatchSize = maxBatchSize;
845                 fMaxBatchAgeMs = maxBatchAgeMs;
846                 fCompress = compress;
847                 threadOccuranceTime = httpThreadOccurnace;
848                 fPending = new LinkedBlockingQueue<>();
849                 fDontSendUntilMs = 0;
850                 fExec = new ScheduledThreadPoolExecutor(1);
851                 fExec.scheduleAtFixedRate(new Runnable() {
852                         @Override
853                         public void run() {
854                                 send(false);
855                         }
856                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
857                 pubResponse = new MRPublisherResponse();
858         }
859
860         private static class TimestampedMessage extends message {
861                 public TimestampedMessage(message m) {
862                         super(m);
863                         timestamp = Clock.now();
864                 }
865
866                 public final long timestamp;
867         }
868
869         public String getUsername() {
870                 return username;
871         }
872
873         public void setUsername(String username) {
874                 this.username = username;
875         }
876
877         public String getPassword() {
878                 return password;
879         }
880
881         public void setPassword(String password) {
882                 this.password = password;
883         }
884
885         public String getHost() {
886                 return host;
887         }
888
889         public void setHost(String host) {
890                 this.host = host;
891         }
892
893         public String getContentType() {
894                 return contentType;
895         }
896
897         public void setContentType(String contentType) {
898                 this.contentType = contentType;
899         }
900
901         public String getAuthKey() {
902                 return authKey;
903         }
904
905         public void setAuthKey(String authKey) {
906                 this.authKey = authKey;
907         }
908
909         public String getAuthDate() {
910                 return authDate;
911         }
912
913         public void setAuthDate(String authDate) {
914                 this.authDate = authDate;
915         }
916
917 }