398558de194ff1fddd957c378379a6707efc9f82
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50
51 import com.att.aft.dme2.api.DME2Client;
52 import com.att.aft.dme2.api.DME2Exception;
53 import com.att.nsa.mr.client.HostSelector;
54 import com.att.nsa.mr.client.MRBatchingPublisher;
55 import com.att.nsa.mr.client.response.MRPublisherResponse;
56 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
57
58 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
59 {
60         private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62         public static class Builder 
63         {
64                 public Builder ()
65                 {
66                 }
67
68                 public Builder againstUrls ( Collection<String> baseUrls )
69                 {
70                         fUrls = baseUrls;
71                         return this;
72                 }
73
74                 public Builder onTopic ( String topic )
75                 {
76                         fTopic = topic;
77                         return this;
78                 }
79
80                 public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
81                 {
82                         fMaxBatchSize = maxBatchSize;
83                         fMaxBatchAgeMs = maxBatchAgeMs;
84                         return this;
85                 }
86
87                 public Builder compress ( boolean compress )
88                 {
89                         fCompress = compress;
90                         return this;
91                 }
92                 
93                 public Builder httpThreadTime ( int threadOccuranceTime )
94                 {
95                         this.threadOccuranceTime = threadOccuranceTime;
96                         return this;
97                 }
98                 
99                 public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
100                 {
101                         fAllowSelfSignedCerts = allowSelfSignedCerts;
102                         return this;
103                 }
104                 
105                 public Builder withResponse ( boolean withResponse)
106                 {
107                         fWithResponse = withResponse;
108                         return this;
109                 }
110                 public MRSimplerBatchPublisher build ()
111                 {
112                         if(!fWithResponse) 
113                         {
114                                 try {
115                                         return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
116                                 } catch (MalformedURLException e) {
117                                         throw new RuntimeException(e);
118                                 }
119                         } else 
120                         {
121                                 try {
122                                         return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
123                                 } catch (MalformedURLException e) {
124                                         throw new RuntimeException(e);
125                                 }
126                         }
127                                 
128                 }
129
130                 private Collection<String> fUrls;
131                 private String fTopic;
132                 private int fMaxBatchSize = 100;
133                 private long fMaxBatchAgeMs = 1000;
134                 private boolean fCompress = false;
135                 private int threadOccuranceTime = 50;
136                 private boolean fAllowSelfSignedCerts = false;
137                 private boolean fWithResponse = false;
138                 
139         };
140
141         @Override
142         public int send ( String partition, String msg )
143         {
144                 return send ( new message ( partition, msg ) );
145         }
146         @Override
147         public int send ( String msg )
148         {
149                 return send ( new message ( null, msg ) );
150         }
151
152
153         @Override
154         public int send ( message msg )
155         {
156                 final LinkedList<message> list = new LinkedList<message> ();
157                 list.add ( msg );
158                 return send ( list );
159         }
160         
161         
162
163         @Override
164         public synchronized int send ( Collection<message> msgs )
165         {
166                 if ( fClosed )
167                 {
168                         throw new IllegalStateException ( "The publisher was closed." );
169                 }
170                 
171                 for ( message userMsg : msgs )
172                 {
173                         fPending.add ( new TimestampedMessage ( userMsg ) );
174                 }
175                 return getPendingMessageCount ();
176         }
177
178         @Override
179         public synchronized int getPendingMessageCount ()
180         {
181                 return fPending.size ();
182         }
183
184         @Override
185         public void close ()
186         {
187                 try
188                 {
189                         final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
190                         if ( remains.size() > 0 )
191                         {
192                                 getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
193                                         + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
194                         }
195                 }
196                 catch ( InterruptedException e )
197                 {
198                         getLog().warn ( "Possible message loss. " + e.getMessage(), e );
199                 }
200                 catch ( IOException e )
201                 {
202                         getLog().warn ( "Possible message loss. " + e.getMessage(), e );
203                 }
204         }
205
206         @Override
207         public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
208         {
209                 synchronized ( this )
210                 {
211                         fClosed = true;
212
213                         // stop the background sender
214                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
215                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
216                         fExec.shutdown ();
217                 }
218
219                 final long now = Clock.now ();
220                 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
221                 final long timeoutAtMs = now + waitInMs;
222
223                 while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
224                 {
225                         send ( true );
226                         Thread.sleep ( 250 );
227                 }
228
229                 synchronized ( this )
230                 {
231                         final LinkedList<message> result = new LinkedList<message> ();
232                         fPending.drainTo ( result );
233                         return result;
234                 }
235         }
236
237         /**
238          * Possibly send a batch to the MR server. This is called by the background thread
239          * and the close() method
240          * 
241          * @param force
242          */
243         private synchronized void send ( boolean force )
244         {
245                 if ( force || shouldSendNow () )
246                 {
247                         if ( !sendBatch () )
248                         {
249                                 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
250
251                                 // note the time for back-off
252                                 fDontSendUntilMs = sfWaitAfterError + Clock.now ();
253                         }
254                 }
255         }
256
257         private synchronized boolean shouldSendNow ()
258         {
259                 boolean shouldSend = false;
260                 if ( fPending.size () > 0 )
261                 {
262                         final long nowMs = Clock.now ();
263
264                         shouldSend = ( fPending.size() >= fMaxBatchSize );
265                         if ( !shouldSend )
266                         {
267                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
268                                 shouldSend = sendAtMs <= nowMs;
269                         }
270
271                         // however, wait after an error
272                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs; 
273                 }
274                 return shouldSend;
275         }
276
277         private synchronized boolean sendBatch ()
278         {
279                 // it's possible for this call to be made with an empty list. in this case, just return.
280                 if ( fPending.size() < 1 )
281                 {
282                         return true;
283                 }
284
285                 final long nowMs = Clock.now ();
286                 
287                 host = this.fHostSelector.selectBaseHost();
288                 
289                 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
290                 
291
292                 try
293                 {
294                         /*final String contentType =
295                                 fCompress ?
296                                         MRFormat.CAMBRIA_ZIP.toString () :
297                                         MRFormat.CAMBRIA.toString () 
298                         ;*/
299             
300                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
301                         OutputStream os = baseStream;
302                         final String contentType = props.getProperty("contenttype");
303                         if(contentType.equalsIgnoreCase("application/json")){
304                                 JSONArray jsonArray = new JSONArray();
305                                 for ( TimestampedMessage m : fPending )
306                                 {
307                                         JSONObject jsonObject = new JSONObject(m.fMsg);
308                                                                 
309                                                 jsonArray.put(jsonObject);
310                                 
311                                 }
312                                 os.write (jsonArray.toString().getBytes() );    
313                                 os.close();
314
315                                 }else if (contentType.equalsIgnoreCase("text/plain")){
316                                         for ( TimestampedMessage m : fPending )
317                                         {                                                                               
318                                                 os.write ( m.fMsg.getBytes() );
319                                                 os.write ( '\n' );
320                                         }
321                                         os.close ();
322                                 } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
323                                         if ( contentType.equalsIgnoreCase("application/cambria-zip") )
324                                         {
325                                                 os = new GZIPOutputStream ( baseStream );
326                                         }
327                                         for ( TimestampedMessage m : fPending )
328                                         {
329                                                 
330                                                 os.write ( ( "" + m.fPartition.length () ).getBytes() );
331                                                 os.write ( '.' );
332                                                 os.write ( ( "" + m.fMsg.length () ).getBytes() );
333                                                 os.write ( '.' );
334                                                 os.write ( m.fPartition.getBytes() );
335                                                 os.write ( m.fMsg.getBytes() );
336                                                 os.write ( '\n' );
337                                         }
338                                         os.close ();
339                                 }else{
340                                         for ( TimestampedMessage m : fPending )
341                                         {                                                                               
342                                                 os.write ( m.fMsg.getBytes() );
343                                         
344                                         }
345                                         os.close ();
346                                 }
347              
348                 
349
350                         final long startMs = Clock.now ();
351                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
352                                 
353                         
354                                 DME2Configue();
355                                 
356                                 Thread.sleep(5);
357                                 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
358                                 sender.setPayload(os.toString());               
359                                 String dmeResponse = sender.sendAndWait(5000L);
360                                 
361                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
362                                                 + dmeResponse.toString();
363                                 getLog().info(logLine);
364                                 fPending.clear();
365                                 return true;
366                         } 
367                         
368                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
369                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
370                                 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
371                                 //System.out.println(result.getInt("status"));
372                                 //Here we are checking for error response. If HTTP status
373                                 //code is not within the http success response code
374                                 //then we consider this as error and return false
375                                 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
376                                         return false;
377                                 }
378                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
379                                 getLog().info(logLine);
380                                 fPending.clear();
381                                 return true;
382                         } 
383                         
384                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
385                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
386                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
387                                 
388                                 
389                                 //System.out.println(result.getInt("status"));
390                                 //Here we are checking for error response. If HTTP status
391                                 //code is not within the http success response code
392                                 //then we consider this as error and return false
393                                 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
394                                         return false;
395                                 }
396                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
397                                 getLog().info(logLine);
398                                 fPending.clear();
399                                 return true;
400                         }
401                 }
402                 catch ( IllegalArgumentException x ) {
403                         getLog().warn ( x.getMessage(), x );
404                 } catch ( IOException x ) {
405                         getLog().warn ( x.getMessage(), x );
406                 } catch (HttpException x) {
407                         getLog().warn ( x.getMessage(), x );
408                 } catch (Exception x) {
409                         getLog().warn(x.getMessage(), x);
410                 }
411                 return false;
412         }
413
414         public synchronized MRPublisherResponse sendBatchWithResponse () 
415         {
416                 // it's possible for this call to be made with an empty list. in this case, just return.
417                 if ( fPending.size() < 1 )
418                 {
419                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
420                         pubResponse.setResponseMessage("No Messages to send");
421                         return pubResponse;
422                 }
423
424                 final long nowMs = Clock.now ();
425                 
426                 host = this.fHostSelector.selectBaseHost();
427                 
428                 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
429                 OutputStream os=null;
430                 try
431                 {
432                         
433                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
434                          os = baseStream;
435                         final String contentType = props.getProperty("contenttype");
436                         if(contentType.equalsIgnoreCase("application/json")){
437                                 JSONArray jsonArray = new JSONArray();
438                                 for ( TimestampedMessage m : fPending )
439                                 {
440                                         JSONObject jsonObject = new JSONObject(m.fMsg);
441                                                                 
442                                                 jsonArray.put(jsonObject);
443                                 
444                                 }
445                                 os.write (jsonArray.toString().getBytes() );    
446                                 }else if (contentType.equalsIgnoreCase("text/plain")){
447                                         for ( TimestampedMessage m : fPending )
448                                         {                                                                               
449                                                 os.write ( m.fMsg.getBytes() );
450                                                 os.write ( '\n' );
451                                         }
452                                 } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
453                                         if ( contentType.equalsIgnoreCase("application/cambria-zip") )
454                                         {
455                                                 os = new GZIPOutputStream ( baseStream );
456                                         }
457                                         for ( TimestampedMessage m : fPending )
458                                         {
459                                                 
460                                                 os.write ( ( "" + m.fPartition.length () ).getBytes() );
461                                                 os.write ( '.' );
462                                                 os.write ( ( "" + m.fMsg.length () ).getBytes() );
463                                                 os.write ( '.' );
464                                                 os.write ( m.fPartition.getBytes() );
465                                                 os.write ( m.fMsg.getBytes() );
466                                                 os.write ( '\n' );
467                                         }
468                                         os.close ();
469                                 }else{
470                                         for ( TimestampedMessage m : fPending )
471                                         {                                                                               
472                                                 os.write ( m.fMsg.getBytes() );
473                                         
474                                         }
475                                 }
476              
477                 
478
479                         final long startMs = Clock.now ();
480                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
481                                 
482                         
483                                 try {
484                                 DME2Configue();
485                                 
486                                 Thread.sleep(5);
487                                 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
488                                 sender.setPayload(os.toString());               
489                                                 
490                                 
491                                 String dmeResponse = sender.sendAndWait(5000L);
492                                 System.out.println("dmeres->"+dmeResponse);             
493                                 
494                                 
495                                 pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
496                                 
497                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
498                                         
499                                         return pubResponse;
500                                 }
501                                 final String logLine = String.valueOf((Clock.now() - startMs))
502                                                 + dmeResponse.toString();
503                                 getLog().info(logLine);
504                                 fPending.clear();
505                                 
506                                 }
507                                 catch (DME2Exception x) {
508                                         getLog().warn(x.getMessage(), x);
509                                         pubResponse.setResponseCode(x.getErrorCode());
510                                         pubResponse.setResponseMessage(x.getErrorMessage());
511                                 } catch (URISyntaxException x) {
512                                         
513                                         getLog().warn(x.getMessage(), x);
514                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
515                                         pubResponse.setResponseMessage(x.getMessage());
516                                 } catch (Exception x) {
517
518                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
519                                         pubResponse.setResponseMessage(x.getMessage());
520                                         logger.error("exception: ", x);
521                                         
522                                 }
523                                 
524                                 return pubResponse;
525                         } 
526                         
527                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
528                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
529                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
530                                 //System.out.println(result.getInt("status"));
531                                 //Here we are checking for error response. If HTTP status
532                                 //code is not within the http success response code
533                                 //then we consider this as error and return false
534                                 
535                                 
536                                 pubResponse = createMRPublisherResponse(result,pubResponse);
537                                 
538                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
539                                         
540                                         return pubResponse;
541                                 }
542                                 
543                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
544                                 getLog().info(logLine);
545                                 fPending.clear();
546                                 return pubResponse;
547                         } 
548                         
549                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
550                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
551                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
552                                 
553                                 //System.out.println(result.getInt("status"));
554                                 //Here we are checking for error response. If HTTP status
555                                 //code is not within the http success response code
556                                 //then we consider this as error and return false
557                                 pubResponse = createMRPublisherResponse(result,pubResponse);
558                                 
559                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
560                                         
561                                         return pubResponse;
562                                 }
563                                 
564                                 final String logLine = String.valueOf((Clock.now() - startMs));
565                                 getLog().info(logLine);
566                                 fPending.clear();
567                                 return pubResponse;
568                         }
569                 }
570                 catch ( IllegalArgumentException x ) {
571                         getLog().warn ( x.getMessage(), x );
572                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
573                         pubResponse.setResponseMessage(x.getMessage());
574                         
575                 } catch ( IOException x ) {
576                         getLog().warn ( x.getMessage(), x );
577                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
578                         pubResponse.setResponseMessage(x.getMessage());
579                         
580                 } catch (HttpException x) {
581                         getLog().warn ( x.getMessage(), x );
582                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
583                         pubResponse.setResponseMessage(x.getMessage());
584                         
585                 } catch (Exception x) {
586                         getLog().warn(x.getMessage(), x);
587                         
588                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
589                         pubResponse.setResponseMessage(x.getMessage());
590                         
591                 }
592                 
593                 finally {
594                         if (fPending.size()>0) {
595                                 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
596                                 pubResponse.setPendingMsgs(fPending.size());
597                         }
598                         if (os != null) {
599                                 try {
600                                 os.close();
601                                 } catch (Exception x) {
602                                         getLog().warn(x.getMessage(), x);
603                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
604                                         pubResponse.setResponseMessage("Error in closing Output Stream");
605                                 }
606                                 }
607                 }
608                 
609                 return pubResponse;
610         }
611         
612 private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
613                 
614          if (reply.isEmpty()) 
615          {
616                  
617                  mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
618                  mrPubResponse.setResponseMessage("Please verify the Producer properties");
619          }
620          else if(reply.startsWith("{"))
621          {
622                         JSONObject jObject = new JSONObject(reply);
623                         if(jObject.has("message") && jObject.has("status"))
624                         {
625                                 String message = jObject.getString("message");
626                                 if(null != message)
627                                 {
628                                         mrPubResponse.setResponseMessage(message);      
629                                 }
630                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
631                         }
632                         else
633                          {
634                                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
635                                         mrPubResponse.setResponseMessage(reply);        
636                          }
637      }
638          else if (reply.startsWith("<"))
639          {
640                  String responseCode = getHTTPErrorResponseCode(reply);
641                  if( responseCode.contains("403"))
642                         {
643                          responseCode = "403";
644                         }       
645                  mrPubResponse.setResponseCode(responseCode);
646                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));   
647          }
648          
649                 return mrPubResponse;
650         }
651
652         private final String fTopic;
653         private final int fMaxBatchSize;
654         private final long fMaxBatchAgeMs;
655         private final boolean fCompress;
656         private int threadOccuranceTime;
657         private boolean fClosed;
658         private String username;
659         private String password;
660         private String host;
661         
662         //host selector
663         private HostSelector fHostSelector = null;
664         
665         private final LinkedBlockingQueue<TimestampedMessage> fPending;
666         private long fDontSendUntilMs;
667         private final ScheduledThreadPoolExecutor fExec;
668
669         private String latitude;
670         private String longitude;
671         private String version;
672         private String serviceName;
673         private String env;
674         private String partner;
675         private String routeOffer;
676         private String subContextPath;
677         private String protocol;
678         private String methodType;
679         private String url;
680         private String dmeuser;
681         private String dmepassword;
682         private String contentType;
683         private static final long sfWaitAfterError = 10000;
684         private HashMap<String, String> DMETimeOuts;
685         private DME2Client sender;
686         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
687         public String producerFilePath;
688         private String authKey;
689         private String authDate;
690         private String handlers;
691         private Properties props;
692         public static String routerFilePath;
693         protected static final Map<String, String> headers=new HashMap<String, String>();
694         public static MultivaluedMap<String, Object> headersMap;
695         
696         
697         private MRPublisherResponse pubResponse;
698         
699         public MRPublisherResponse getPubResponse() {
700                 return pubResponse;
701         }
702         public void setPubResponse(MRPublisherResponse pubResponse) {
703                 this.pubResponse = pubResponse;
704         }
705         
706         public static String getRouterFilePath() {
707                 return routerFilePath;
708         }
709
710         public static void setRouterFilePath(String routerFilePath) {
711                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
712         }
713
714         public Properties getProps() {
715                 return props;
716         }
717
718         public void setProps(Properties props) {
719                 this.props = props;
720         }
721
722         public String getProducerFilePath() {
723                 return producerFilePath;
724         }
725
726         public void setProducerFilePath(String producerFilePath) {
727                 this.producerFilePath = producerFilePath;
728         }
729
730         public String getProtocolFlag() {
731                 return protocolFlag;
732         }
733
734         public void setProtocolFlag(String protocolFlag) {
735                 this.protocolFlag = protocolFlag;
736         }
737         
738         
739         private void DME2Configue() throws Exception {
740                 try {
741                         
742                 /*      FileReader reader = new FileReader(new File (producerFilePath));
743                         Properties props = new Properties();            
744                         props.load(reader);*/
745                         latitude = props.getProperty("Latitude");
746                         longitude = props.getProperty("Longitude");
747                         version = props.getProperty("Version");
748                         serviceName = props.getProperty("ServiceName");
749                         env = props.getProperty("Environment");
750                         partner = props.getProperty("Partner");
751                         routeOffer = props.getProperty("routeOffer");
752                         subContextPath = props.getProperty("SubContextPath")+fTopic;
753                         /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
754                                 subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
755                         }*/                     
756                         protocol = props.getProperty("Protocol");
757                         methodType = props.getProperty("MethodType");
758                         dmeuser = props.getProperty("username");
759                         dmepassword = props.getProperty("password");
760                         contentType = props.getProperty("contenttype");
761                         handlers = props.getProperty("sessionstickinessrequired");
762                         routerFilePath= props.getProperty("DME2preferredRouterFilePath");
763                         
764                         /**
765                          * Changes to DME2Client url to use Partner for auto failover between data centers
766                          * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
767                          */
768                         
769
770                         String partitionKey = props.getProperty("partition");
771                         
772                         if (partner != null && !partner.isEmpty() ) 
773                         { 
774                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
775                 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
776                     url = url + "&partitionKey=" + partitionKey;
777                 }
778                         }
779                         else if (routeOffer!=null && !routeOffer.isEmpty()) 
780                         { 
781                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
782                 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
783                     url = url + "&partitionKey=" + partitionKey;
784                 }
785                         }
786                          
787                         DMETimeOuts = new HashMap<String, String>();
788                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
789                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
790                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
791                         DMETimeOuts.put("Content-Type", contentType);
792                         System.setProperty("AFT_LATITUDE", latitude);
793                         System.setProperty("AFT_LONGITUDE", longitude);
794                         System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
795                         //System.setProperty("DME2.DEBUG", "true");
796                 //      System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
797                         //System.out.println("XXXXXX"+url);
798                         
799                         //SSL changes
800                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
801                                         "SSLv3,TLSv1,TLSv1.1");
802                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
803                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
804                         
805                         //SSL changes
806                         
807                         sender = new DME2Client(new URI(url), 5000L);
808                                 
809                         sender.setAllowAllHttpReturnCodes(true);
810                         sender.setMethod(methodType);
811                         sender.setSubContext(subContextPath);   
812                         sender.setCredentials(dmeuser, dmepassword);
813                         sender.setHeaders(DMETimeOuts);
814                         if(handlers.equalsIgnoreCase("yes")){
815                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
816                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
817                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
818                                 }else{
819                                         sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
820                                 }
821                 } catch (DME2Exception x) {
822                         getLog().warn(x.getMessage(), x);
823                         throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
824                 } catch (URISyntaxException x) {
825                         
826                         getLog().warn(x.getMessage(), x);
827                         throw new URISyntaxException(url,x.getMessage());
828                 } catch (Exception x) {
829
830                         getLog().warn(x.getMessage(), x);
831                         throw new Exception(x.getMessage());
832                 }
833         }
834         
835         private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
836         {
837                 super ( hosts );
838
839                 if ( topic == null || topic.length() < 1 )
840                 {
841                         throw new IllegalArgumentException ( "A topic must be provided." );
842                 }
843                 
844                 fHostSelector = new HostSelector(hosts, null);
845                 fClosed = false;
846                 fTopic = topic;
847                 fMaxBatchSize = maxBatchSize;
848                 fMaxBatchAgeMs = maxBatchAgeMs;
849                 fCompress = compress;
850
851                 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
852                 fDontSendUntilMs = 0;
853                 fExec = new ScheduledThreadPoolExecutor ( 1 );
854                 pubResponse = new MRPublisherResponse();
855                 
856         }
857         
858         private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
859         {
860                 super ( hosts );
861
862                 if ( topic == null || topic.length() < 1 )
863                 {
864                         throw new IllegalArgumentException ( "A topic must be provided." );
865                 }
866                 
867                 fHostSelector = new HostSelector(hosts, null);
868                 fClosed = false;
869                 fTopic = topic;
870                 fMaxBatchSize = maxBatchSize;
871                 fMaxBatchAgeMs = maxBatchAgeMs;
872                 fCompress = compress;
873                 threadOccuranceTime=httpThreadOccurnace;
874                 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
875                 fDontSendUntilMs = 0;
876                 fExec = new ScheduledThreadPoolExecutor ( 1 );
877                 fExec.scheduleAtFixedRate ( new Runnable()
878                 {
879                         @Override
880                         public void run ()
881                         {
882                                 send ( false );
883                         }
884                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
885         }
886
887         private static class TimestampedMessage extends message
888         {
889                 public TimestampedMessage ( message m )
890                 {
891                         super ( m );
892                         timestamp = Clock.now();
893                 }
894                 public final long timestamp;
895         }
896
897         public String getUsername() {
898                 return username;
899         }
900
901         public void setUsername(String username) {
902                 this.username = username;
903         }
904
905         public String getPassword() {
906                 return password;
907         }
908
909         public void setPassword(String password) {
910                 this.password = password;
911         }
912
913         public String getHost() {
914                 return host;
915         }
916
917         public void setHost(String host) {
918                 this.host = host;
919         }
920
921         public String getContentType() {
922                 return contentType;
923         }
924
925         public void setContentType(String contentType) {
926                 this.contentType = contentType;
927         }
928
929         public String getAuthKey() {
930                 return authKey;
931         }
932
933         public void setAuthKey(String authKey) {
934                 this.authKey = authKey;
935         }
936
937         public String getAuthDate() {
938                 return authDate;
939         }
940
941         public void setAuthDate(String authDate) {
942                 this.authDate = authDate;
943         }
944         
945 }