74fec8a3612b5ed952ab9f29d8e30d4ef7895050
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Modifications Copyright © 2021 Orange.
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *
23  *******************************************************************************/
24
25 package org.onap.dmaap.mr.client.impl;
26
27 import com.att.aft.dme2.api.DME2Client;
28 import com.att.aft.dme2.api.DME2Exception;
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.net.MalformedURLException;
33 import java.net.URI;
34 import java.net.URISyntaxException;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Properties;
41
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.zip.GZIPOutputStream;
46 import javax.ws.rs.core.MultivaluedMap;
47 import org.apache.http.HttpException;
48 import org.apache.http.HttpStatus;
49 import org.json.JSONArray;
50 import org.json.JSONObject;
51 import org.json.JSONTokener;
52 import org.onap.dmaap.mr.client.HostSelector;
53 import org.onap.dmaap.mr.client.MRBatchingPublisher;
54 import org.onap.dmaap.mr.client.ProtocolType;
55 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60     private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62     private static final String PASSWORD = "password";
63     private static final String USERNAME = "username";
64     private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
65     private static final String SERVICE_NAME = "ServiceName";
66     private static final String PARTNER = "Partner";
67     private static final String ROUTE_OFFER = "routeOffer";
68     private static final String PROTOCOL = "Protocol";
69     private static final String METHOD_TYPE = "MethodType";
70     private static final String CONTENT_TYPE = "contenttype";
71     private static final String LATITUDE = "Latitude";
72     private static final String LONGITUDE = "Longitude";
73     private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
74     private static final String VERSION = "Version";
75     private static final String ENVIRONMENT = "Environment";
76     private static final String SUB_CONTEXT_PATH = "SubContextPath";
77     private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
78     private static final String PARTITION = "partition";
79     private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
80     private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
81     private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
82     private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
83     private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
84     private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
85
86     private static final String CONTENT_TYPE_TEXT = "text/plain";
87
88     private static final String JSON_STATUS = "status";
89
90     public static class Builder {
91
92         public Builder againstUrls(Collection<String> baseUrls) {
93             fUrls = baseUrls;
94             return this;
95         }
96
97         public Builder againstUrlsOrServiceName(Collection<String> baseUrls, Collection<String> serviceName, String transportype) {
98             fUrls = baseUrls;
99             return this;
100         }
101
102         public Builder onTopic(String topic) {
103             fTopic = topic;
104             return this;
105         }
106
107         public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
108             fMaxBatchSize = maxBatchSize;
109             fMaxBatchAgeMs = maxBatchAgeMs;
110             return this;
111         }
112
113         public Builder compress(boolean compress) {
114             fCompress = compress;
115             return this;
116         }
117
118         public Builder httpThreadTime(int threadOccurrenceTime) {
119             this.threadOccurrenceTime = threadOccurrenceTime;
120             return this;
121         }
122
123         public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
124             fAllowSelfSignedCerts = allowSelfSignedCerts;
125             return this;
126         }
127
128         public Builder withResponse(boolean withResponse) {
129             fWithResponse = withResponse;
130             return this;
131         }
132
133         public MRSimplerBatchPublisher build() {
134             if (!fWithResponse) {
135                 try {
136                     return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
137                             fAllowSelfSignedCerts, threadOccurrenceTime);
138                 } catch (MalformedURLException e) {
139                     throw new IllegalArgumentException(e);
140                 }
141             } else {
142                 try {
143                     return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
144                             fAllowSelfSignedCerts, fMaxBatchSize);
145                 } catch (MalformedURLException e) {
146                     throw new IllegalArgumentException(e);
147                 }
148             }
149
150         }
151
152         private Collection<String> fUrls;
153         private String fTopic;
154         private int fMaxBatchSize = 100;
155
156         private long fMaxBatchAgeMs = 1000;
157         private boolean fCompress = false;
158         private int threadOccurrenceTime = 50;
159         private boolean fAllowSelfSignedCerts = false;
160         private boolean fWithResponse = false;
161
162     }
163
164     @Override
165     public int send(String partition, String msg) {
166         return send(new Message(partition, msg));
167     }
168
169     @Override
170     public int send(String msg) {
171         return send(new Message(null, msg));
172     }
173
174     @Override
175     public int send(Message msg) {
176         final LinkedList<Message> list = new LinkedList<>();
177         list.add(msg);
178         return send(list);
179     }
180
181     @Override
182     public synchronized int send(Collection<Message> msgs) {
183         if (fClosed) {
184             throw new IllegalStateException("The publisher was closed.");
185         }
186
187         for (Message userMsg : msgs) {
188             fPending.add(new TimestampedMessage(userMsg));
189         }
190         return getPendingMessageCount();
191     }
192
193     @Override
194     public synchronized int getPendingMessageCount() {
195         return fPending.size();
196     }
197
198     @Override
199     public void close() {
200         try {
201             final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
202             if (remains.isEmpty()) {
203                 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
204                         remains.size());
205             }
206         } catch (InterruptedException e) {
207             getLog().warn("Possible message loss. " + e.getMessage(), e);
208             Thread.currentThread().interrupt();
209         } catch (IOException e) {
210             getLog().warn("Possible message loss. " + e.getMessage(), e);
211         }
212     }
213
214     @Override
215     public List<Message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
216         synchronized (this) {
217             fClosed = true;
218
219             // stop the background sender
220             fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
221             fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
222             fExec.shutdown();
223         }
224
225         final long now = Clock.now();
226         final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
227         final long timeoutAtMs = now + waitInMs;
228
229         while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
230             send(true);
231             Thread.sleep(250);
232         }
233
234         synchronized (this) {
235             final LinkedList<Message> result = new LinkedList<>();
236             fPending.drainTo(result);
237             return result;
238         }
239     }
240
241     /**
242      * Possibly send a batch to the MR server. This is called by the background
243      * thread and the close() method
244      *
245      * @param force
246      */
247     private synchronized void send(boolean force) {
248         if ((force || shouldSendNow()) && !sendBatch()) {
249             getLog().warn("Send failed, {} message to send.", fPending.size());
250             // note the time for back-off
251             fDontSendUntilMs = SF_WAIT_AFTER_ERROR + Clock.now();
252         }
253     }
254
255     private synchronized boolean shouldSendNow() {
256         boolean shouldSend = false;
257         if (!fPending.isEmpty()) {
258             final long nowMs = Clock.now();
259
260             shouldSend = (fPending.size() >= fMaxBatchSize);
261             if (!shouldSend) {
262                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
263                 shouldSend = sendAtMs <= nowMs;
264             }
265
266             // however, wait after an error
267             shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
268         }
269         return shouldSend;
270     }
271
272     /**
273      * Method to parse published JSON Objects and Arrays.
274      *
275      * @return JSONArray
276      */
277     private JSONArray parseJSON() {
278         JSONArray jsonArray = new JSONArray();
279         for (TimestampedMessage m : fPending) {
280             JSONTokener jsonTokener = new JSONTokener(m.fMsg);
281             JSONObject jsonObject = null;
282             JSONArray tempjsonArray = null;
283             final char firstChar = jsonTokener.next();
284             jsonTokener.back();
285             if ('[' == firstChar) {
286                 tempjsonArray = new JSONArray(jsonTokener);
287                 for (int i = 0; i < tempjsonArray.length(); i++) {
288                     jsonArray.put(tempjsonArray.getJSONObject(i));
289                 }
290             } else {
291                 jsonObject = new JSONObject(jsonTokener);
292                 jsonArray.put(jsonObject);
293             }
294
295         }
296         return jsonArray;
297     }
298
299     private void logTime(long startMs, String dmeResponse) {
300         if (getLog().isInfoEnabled()) {
301             getLog().info("MR reply ok ({} ms):{}", (Clock.now() - startMs), dmeResponse);
302         }
303     }
304
305     private void logSendMessage(int nbMessage, String dest, long time) {
306         if (getLog().isInfoEnabled()) {
307             getLog().info("sending {} msgs to {}. Oldest: {} ms", nbMessage, dest, time);
308         }
309     }
310
311     private synchronized boolean sendBatch() {
312         // it's possible for this call to be made with an empty list. in this
313         // case, just return.
314         if (fPending.isEmpty()) {
315             return true;
316         }
317
318         final long nowMs = Clock.now();
319
320         if (this.fHostSelector != null) {
321             host = this.fHostSelector.selectBaseHost();
322         }
323
324         final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROTOCOL),
325                 props.getProperty(PARTITION));
326
327         try {
328
329             final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
330             OutputStream os = baseStream;
331             final String contentType = props.getProperty(CONTENT_TYPE);
332             if (contentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
333                 JSONArray jsonArray = parseJSON();
334                 os.write(jsonArray.toString().getBytes());
335                 os.close();
336
337             } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
338                 for (TimestampedMessage m : fPending) {
339                     os.write(m.fMsg.getBytes());
340                     os.write('\n');
341                 }
342                 os.close();
343             } else if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
344                     || (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
345                 if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
346                     os = new GZIPOutputStream(baseStream);
347                 }
348                 for (TimestampedMessage m : fPending) {
349
350                     os.write(("" + m.fPartition.length()).getBytes());
351                     os.write('.');
352                     os.write(("" + m.fMsg.length()).getBytes());
353                     os.write('.');
354                     os.write(m.fPartition.getBytes());
355                     os.write(m.fMsg.getBytes());
356                     os.write('\n');
357                 }
358                 os.close();
359             } else {
360                 for (TimestampedMessage m : fPending) {
361                     os.write(m.fMsg.getBytes());
362
363                 }
364                 os.close();
365             }
366
367             final long startMs = Clock.now();
368             if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
369
370                 configureDME2();
371
372                 this.wait(5);
373                 if (fPending.peek() != null) {
374                     logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
375                 }
376                 sender.setPayload(os.toString());
377                 String dmeResponse = sender.sendAndWait(5000L);
378
379                 logTime(startMs, dmeResponse);
380                 fPending.clear();
381                 return true;
382             }
383
384             if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
385                 if (fPending.peek() != null) {
386                     logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
387                 }
388                 final JSONObject result =
389                         postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
390                                 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
391                                 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
392                 // Here we are checking for error response. If HTTP status
393                 // code is not within the http success response code
394                 // then we consider this as error and return false
395                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
396                     return false;
397                 }
398                 logTime(startMs, result.toString());
399                 fPending.clear();
400                 return true;
401             }
402
403             if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
404                 if (fPending.peek() != null) {
405                     logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
406                 }
407                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
408                         protocolFlag);
409
410                 // Here we are checking for error response. If HTTP status
411                 // code is not within the http success response code
412                 // then we consider this as error and return false
413                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
414                     return false;
415                 }
416                 logTime(startMs, result.toString());
417                 fPending.clear();
418                 return true;
419             }
420
421             if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
422                 if (fPending.peek() != null) {
423                     logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
424                 }
425                 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
426
427                 // Here we are checking for error response. If HTTP status
428                 // code is not within the http success response code
429                 // then we consider this as error and return false
430                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
431                     return false;
432                 }
433                 logTime(startMs, result.toString());
434                 fPending.clear();
435                 return true;
436             }
437         } catch (InterruptedException e) {
438             getLog().warn("Interrupted!", e);
439             // Restore interrupted state...
440             Thread.currentThread().interrupt();
441         } catch (Exception x) {
442             getLog().warn(x.getMessage(), x);
443         }
444         return false;
445     }
446
447     public synchronized MRPublisherResponse sendBatchWithResponse() {
448         // it's possible for this call to be made with an empty list. in this
449         // case, just return.
450         if (fPending.isEmpty()) {
451             pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
452             pubResponse.setResponseMessage("No Messages to send");
453             return pubResponse;
454         }
455
456         final long nowMs = Clock.now();
457
458         host = this.fHostSelector.selectBaseHost();
459
460         final String httpUrl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROTOCOL),
461                 props.getProperty(PARTITION));
462         OutputStream os = null;
463         try (ByteArrayOutputStream baseStream = new ByteArrayOutputStream()) {
464             os = baseStream;
465             final String propsContentType = props.getProperty(CONTENT_TYPE);
466             if (propsContentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
467                 JSONArray jsonArray = parseJSON();
468                 os.write(jsonArray.toString().getBytes());
469             } else if (propsContentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
470                 for (TimestampedMessage m : fPending) {
471                     os.write(m.fMsg.getBytes());
472                     os.write('\n');
473                 }
474             } else if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
475                     || (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
476                 if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
477                     os = new GZIPOutputStream(baseStream);
478                 }
479                 for (TimestampedMessage m : fPending) {
480                     os.write(("" + m.fPartition.length()).getBytes());
481                     os.write('.');
482                     os.write(("" + m.fMsg.length()).getBytes());
483                     os.write('.');
484                     os.write(m.fPartition.getBytes());
485                     os.write(m.fMsg.getBytes());
486                     os.write('\n');
487                 }
488                 os.close();
489             } else {
490                 for (TimestampedMessage m : fPending) {
491                     os.write(m.fMsg.getBytes());
492
493                 }
494             }
495
496             final long startMs = Clock.now();
497             if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
498
499                 try {
500                     configureDME2();
501
502                     this.wait(5);
503
504                     if (fPending.peek() != null) {
505                         logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
506                     }
507                     sender.setPayload(os.toString());
508
509                     String dmeResponse = sender.sendAndWait(5000L);
510
511                     pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
512
513                     if (Integer.parseInt(pubResponse.getResponseCode()) < 200
514                             || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
515
516                         return pubResponse;
517                     }
518                     final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
519                     getLog().info(logLine);
520                     fPending.clear();
521
522                 } catch (DME2Exception x) {
523                     getLog().warn(x.getMessage(), x);
524                     pubResponse.setResponseCode(x.getErrorCode());
525                     pubResponse.setResponseMessage(x.getErrorMessage());
526                 } catch (URISyntaxException x) {
527
528                     getLog().warn(x.getMessage(), x);
529                     pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
530                     pubResponse.setResponseMessage(x.getMessage());
531                 } catch (InterruptedException e) {
532                     throw e;
533                 } catch (Exception x) {
534
535                     pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
536                     pubResponse.setResponseMessage(x.getMessage());
537                     logger.error("exception: ", x);
538
539                 }
540
541                 return pubResponse;
542             }
543
544             if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
545                 if (fPending.peek() != null) {
546                     logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
547                 }
548                 final String result = postAuthwithResponse(httpUrl, baseStream.toByteArray(), contentType, authKey,
549                         authDate, username, password, protocolFlag);
550                 // Here we are checking for error response. If HTTP status
551                 // code is not within the http success response code
552                 // then we consider this as error and return false
553
554                 pubResponse = createMRPublisherResponse(result, pubResponse);
555
556                 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
557                         || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
558
559                     return pubResponse;
560                 }
561
562                 logTime(startMs, result);
563                 fPending.clear();
564                 return pubResponse;
565             }
566
567             if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
568                 if (fPending.peek() != null) {
569                     logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
570                 }
571                 final String result = postWithResponse(httpUrl, baseStream.toByteArray(), contentType, username,
572                         password, protocolFlag);
573
574                 // Here we are checking for error response. If HTTP status
575                 // code is not within the http success response code
576                 // then we consider this as error and return false
577                 pubResponse = createMRPublisherResponse(result, pubResponse);
578
579                 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
580                         || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
581
582                     return pubResponse;
583                 }
584
585                 final String logLine = String.valueOf((Clock.now() - startMs));
586                 getLog().info(logLine);
587                 fPending.clear();
588                 return pubResponse;
589             }
590
591             if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
592                 if (fPending.peek() != null) {
593                     logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
594                 }
595                 final String result = postNoAuthWithResponse(httpUrl, baseStream.toByteArray(), contentType);
596
597                 // Here we are checking for error response. If HTTP status
598                 // code is not within the http success response code
599                 // then we consider this as error and return false
600                 pubResponse = createMRPublisherResponse(result, pubResponse);
601
602                 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
603                         || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
604
605                     return pubResponse;
606                 }
607
608                 final String logLine = String.valueOf((Clock.now() - startMs));
609                 getLog().info(logLine);
610                 fPending.clear();
611                 return pubResponse;
612             }
613         } catch (IllegalArgumentException | HttpException x) {
614             getLog().warn(x.getMessage(), x);
615             pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
616             pubResponse.setResponseMessage(x.getMessage());
617
618         } catch (IOException x) {
619             getLog().warn(x.getMessage(), x);
620             pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
621             pubResponse.setResponseMessage(x.getMessage());
622         } catch (InterruptedException e) {
623             getLog().warn("Interrupted!", e);
624             // Restore interrupted state...
625             Thread.currentThread().interrupt();
626         } catch (Exception x) {
627             getLog().warn(x.getMessage(), x);
628
629             pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
630             pubResponse.setResponseMessage(x.getMessage());
631
632         } finally {
633             if (!fPending.isEmpty()) {
634                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
635                 pubResponse.setPendingMsgs(fPending.size());
636             }
637             if (os != null) {
638                 try {
639                     os.close();
640                 } catch (Exception x) {
641                     getLog().warn(x.getMessage(), x);
642                     pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
643                     pubResponse.setResponseMessage("Error in closing Output Stream");
644                 }
645             }
646         }
647
648         return pubResponse;
649     }
650
651     public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
652
653         if (reply.isEmpty()) {
654
655             mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
656             mrPubResponse.setResponseMessage("Please verify the Producer properties");
657         } else if (reply.startsWith("{")) {
658             JSONObject jObject = new JSONObject(reply);
659             if (jObject.has("message") && jObject.has(JSON_STATUS)) {
660                 String message = jObject.getString("message");
661                 if (null != message) {
662                     mrPubResponse.setResponseMessage(message);
663                 }
664                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
665             } else {
666                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
667                 mrPubResponse.setResponseMessage(reply);
668             }
669         } else if (reply.startsWith("<")) {
670             String responseCode = getHTTPErrorResponseCode(reply);
671             if (responseCode.contains("403")) {
672                 responseCode = "403";
673             }
674             mrPubResponse.setResponseCode(responseCode);
675             mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
676         }
677
678         return mrPubResponse;
679     }
680
681     private final String fTopic;
682     private final int fMaxBatchSize;
683     private final long fMaxBatchAgeMs;
684     private final boolean fCompress;
685     private int threadOccurrenceTime;
686     private boolean fClosed;
687     private String username;
688     private String password;
689     private String host;
690
691     // host selector
692     private HostSelector fHostSelector = null;
693
694     private final LinkedBlockingQueue<TimestampedMessage> fPending;
695     private long fDontSendUntilMs;
696     private final ScheduledThreadPoolExecutor fExec;
697
698     private String latitude;
699     private String longitude;
700     private String version;
701     private String serviceName;
702     private String env;
703     private String partner;
704     private String routeOffer;
705     private String subContextPath;
706     private String protocol;
707     private String methodType;
708     private String url;
709     private String dmeuser;
710     private String dmepassword;
711     private String contentType;
712     private static final long SF_WAIT_AFTER_ERROR = 10000;
713     private HashMap<String, String> DMETimeOuts;
714     private DME2Client sender;
715     public String protocolFlag = ProtocolType.DME2.getValue();
716     private String authKey;
717     private String authDate;
718     private String handlers;
719     private Properties props;
720     public static String routerFilePath;
721     protected static final Map<String, String> headers = new HashMap<String, String>();
722     public static MultivaluedMap<String, Object> headersMap;
723
724     private MRPublisherResponse pubResponse;
725
726     public MRPublisherResponse getPubResponse() {
727         return pubResponse;
728     }
729
730     public void setPubResponse(MRPublisherResponse pubResponse) {
731         this.pubResponse = pubResponse;
732     }
733
734     public static String getRouterFilePath() {
735         return routerFilePath;
736     }
737
738     public static void setRouterFilePath(String routerFilePath) {
739         MRSimplerBatchPublisher.routerFilePath = routerFilePath;
740     }
741
742     public Properties getProps() {
743         return props;
744     }
745
746     public void setProps(Properties props) {
747         this.props = props;
748         setClientConfig(DmaapClientUtil.getClientConfig(props));
749     }
750
751     public String getProtocolFlag() {
752         return protocolFlag;
753     }
754
755     public void setProtocolFlag(String protocolFlag) {
756         this.protocolFlag = protocolFlag;
757     }
758
759     private void configureDME2() throws Exception {
760         try {
761
762             latitude = props.getProperty(LATITUDE);
763             longitude = props.getProperty(LONGITUDE);
764             version = props.getProperty(VERSION);
765             serviceName = props.getProperty(SERVICE_NAME);
766             env = props.getProperty(ENVIRONMENT);
767             partner = props.getProperty(PARTNER);
768             routeOffer = props.getProperty(ROUTE_OFFER);
769             subContextPath = props.getProperty(SUB_CONTEXT_PATH) + fTopic;
770
771             protocol = props.getProperty(PROTOCOL);
772             methodType = props.getProperty(METHOD_TYPE);
773             dmeuser = props.getProperty(USERNAME);
774             dmepassword = props.getProperty(PASSWORD);
775             contentType = props.getProperty(CONTENT_TYPE);
776             handlers = props.getProperty(SESSION_STICKINESS_REQUIRED);
777
778             MRSimplerBatchPublisher.routerFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
779
780             /*
781              * Changes to DME2Client url to use Partner for auto failover
782              * between data centers When Partner value is not provided use the
783              * routeOffer value for auto failover within a cluster
784              */
785
786             String partitionKey = props.getProperty(PARTITION);
787
788             if (partner != null && !partner.isEmpty()) {
789                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
790                         + partner;
791                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
792                     url = url + "&partitionKey=" + partitionKey;
793                 }
794             } else if (routeOffer != null && !routeOffer.isEmpty()) {
795                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
796                         + routeOffer;
797                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
798                     url = url + "&partitionKey=" + partitionKey;
799                 }
800             }
801
802             DMETimeOuts = new HashMap<>();
803             DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
804             DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
805             DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
806             DMETimeOuts.put("Content-Type", contentType);
807             System.setProperty("AFT_LATITUDE", latitude);
808             System.setProperty("AFT_LONGITUDE", longitude);
809             System.setProperty("AFT_ENVIRONMENT", props.getProperty(AFT_ENVIRONMENT));
810             // System.setProperty("DME2.DEBUG", "true");
811
812             // SSL changes
813             // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
814
815             System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
816             System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
817             System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
818
819             // SSL changes
820
821             sender = new DME2Client(new URI(url), 5000L);
822
823             sender.setAllowAllHttpReturnCodes(true);
824             sender.setMethod(methodType);
825             sender.setSubContext(subContextPath);
826             sender.setCredentials(dmeuser, dmepassword);
827             sender.setHeaders(DMETimeOuts);
828             if ("yes".equalsIgnoreCase(handlers)) {
829                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
830                         props.getProperty(AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
831                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
832                         props.getProperty(AFT_DME2_EXCHANGE_REPLY_HANDLERS));
833                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty(AFT_DME2_REQ_TRACE_ON));
834             } else {
835                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
836             }
837         } catch (DME2Exception x) {
838             getLog().warn(x.getMessage(), x);
839             throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
840         } catch (URISyntaxException x) {
841
842             getLog().warn(x.getMessage(), x);
843             throw new URISyntaxException(url, x.getMessage());
844         } catch (Exception x) {
845
846             getLog().warn(x.getMessage(), x);
847             throw new IllegalArgumentException(x.getMessage());
848         }
849     }
850
851     private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
852                                     boolean compress) throws MalformedURLException {
853         super(hosts);
854
855         if (topic == null || topic.length() < 1) {
856             throw new IllegalArgumentException("A topic must be provided.");
857         }
858
859         fHostSelector = new HostSelector(hosts, null);
860         fClosed = false;
861         fTopic = topic;
862         fMaxBatchSize = maxBatchSize;
863         fMaxBatchAgeMs = maxBatchAgeMs;
864         fCompress = compress;
865
866         fPending = new LinkedBlockingQueue<>();
867         fDontSendUntilMs = 0;
868         fExec = new ScheduledThreadPoolExecutor(1);
869         pubResponse = new MRPublisherResponse();
870
871     }
872
873     private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
874                                     boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurrence) throws MalformedURLException {
875         super(hosts);
876
877         if (topic == null || topic.length() < 1) {
878             throw new IllegalArgumentException("A topic must be provided.");
879         }
880
881         fHostSelector = new HostSelector(hosts, null);
882         fClosed = false;
883         fTopic = topic;
884         fMaxBatchSize = maxBatchSize;
885         fMaxBatchAgeMs = maxBatchAgeMs;
886         fCompress = compress;
887         threadOccurrenceTime = httpThreadOccurrence;
888         fPending = new LinkedBlockingQueue<>();
889         fDontSendUntilMs = 0;
890         fExec = new ScheduledThreadPoolExecutor(1);
891         fExec.scheduleAtFixedRate(new Runnable() {
892             @Override
893             public void run() {
894                 send(false);
895             }
896         }, 100, threadOccurrenceTime, TimeUnit.MILLISECONDS);
897         pubResponse = new MRPublisherResponse();
898     }
899
900     private static class TimestampedMessage extends Message {
901         public TimestampedMessage(Message message) {
902             super(message);
903             timestamp = Clock.now();
904         }
905
906         public final long timestamp;
907     }
908
909     public String getUsername() {
910         return username;
911     }
912
913     public void setUsername(String username) {
914         this.username = username;
915     }
916
917     public String getPassword() {
918         return password;
919     }
920
921     public void setPassword(String password) {
922         this.password = password;
923     }
924
925     public String getHost() {
926         return host;
927     }
928
929     public void setHost(String host) {
930         this.host = host;
931     }
932
933     public String getContentType() {
934         return contentType;
935     }
936
937     public void setContentType(String contentType) {
938         this.contentType = contentType;
939     }
940
941     public String getAuthKey() {
942         return authKey;
943     }
944
945     public void setAuthKey(String authKey) {
946         this.authKey = authKey;
947     }
948
949     public String getAuthDate() {
950         return authDate;
951     }
952
953     public void setAuthDate(String authDate) {
954         this.authDate = authDate;
955     }
956
957 }