clean MR codebase
[dmaap/messagerouter/dmaapclient.git] / src / main / cpp / cambria.cpp
diff --git a/src/main/cpp/cambria.cpp b/src/main/cpp/cambria.cpp
deleted file mode 100644 (file)
index 7aff421..0000000
+++ /dev/null
@@ -1,624 +0,0 @@
-/*******************************************************************************
- *  ============LICENSE_START=======================================================
- *  org.onap.dmaap
- *  ================================================================================
- *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *        http://www.apache.org/licenses/LICENSE-2.0
- *  
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *  ============LICENSE_END=========================================================
- *
- *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
- *******************************************************************************/
-
-/*
- * This is a client library for the AT&T Cambria Event Routing Service.
- */
-
-#include "cambria.h"
-
-#include <arpa/inet.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <stdarg.h>
-#include <string.h>
-
-#include <string>
-#include <list>
-#include <sstream>
-#include <iomanip>
-#include <algorithm>
-
-// field used in JSON encoding to signal stream name
-const char* kPartition = "cambria.partition";
-
-// map from opaque handle to object pointer
-#define toOpaque(x) ((CAMBRIA_CLIENT)x)
-#define fromOpaque(x) ((cambriaClient*)x)
-
-// trace support
-extern void traceOutput ( const char* format, ... );
-#ifdef CAMBRIA_TRACING
-       #define TRACE traceOutput
-#else
-       #define TRACE 1 ? (void) 0 : traceOutput
-#endif
-
-/*
- *     internal cambria client class
- */
-class cambriaClient
-{
-public:
-       cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format );
-       ~cambriaClient ();
-
-       cambriaSendResponse* send ( const char* streamName, const char* message );
-       cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count );
-
-       cambriaGetResponse* get ( int timeoutMs, int limit );
-
-private:
-
-       std::string fHost;
-       int fPort;
-       std::string fTopic;
-       std::string fFormat;
-
-       bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer );
-
-       void write ( int socket, const char* line );
-};
-
-cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) :
-       fHost ( host ),
-       fPort ( port ),
-       fTopic ( topic ),
-       fFormat ( format )
-{
-}
-
-cambriaClient::~cambriaClient ()
-{
-}
-
-/*
- *     This isn't quite right -- if the message already has cambria.partition,
- *     it'll wind up with two entries. Also, message MUST start with '{' and
- *     have at least one field.
- */
-static char* makeJsonMessage ( const char* streamName, const char* message )
-{
-       int len = ::strlen ( message );
-       if ( streamName )
-       {
-               len += ::strlen ( kPartition );
-               len += ::strlen ( streamName );
-               len += 6;       // quote each and a colon and comma
-       }
-
-       char* msg = new char [ len + 1 ];
-       ::strcpy ( msg, "{" );
-       if ( streamName )
-       {
-               ::strcat ( msg, "\"" );
-               ::strcat ( msg, kPartition );
-               ::strcat ( msg, "\":\"" );
-               ::strcat ( msg, streamName );
-               ::strcat ( msg, "\"," );
-       }
-       ::strcat ( msg, message + 1 );
-       return msg;
-}
-
-cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message )
-{
-       return send ( streamName, &message, 1 );
-}
-
-static bool replace ( std::string& str, const std::string& from, const std::string& to )
-{
-       size_t start_pos = str.find ( from );
-       if(start_pos == std::string::npos)
-               return false;
-       str.replace(start_pos, from.length(), to);
-       return true;
-}
-
-static void readResponse ( int s, std::string& response )
-{
-       char buffer [ 4096 ];
-
-       ssize_t n = 0;
-       while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 )
-       {
-               buffer[n] = '\0';
-               response += buffer;
-       }
-}
-
-static int openSocket ( std::string& host, int port, int& ss )
-{
-       TRACE( "connecting to %s\n", host.c_str() );
-
-       struct hostent *he = ::gethostbyname ( host.c_str() );
-       if ( !he )
-       {
-               TRACE("no host entry\n");
-               return CAMBRIA_NO_HOST;
-       }
-
-       if ( he->h_addrtype != AF_INET )
-       {
-               TRACE("not AF_INET\n");
-               return CAMBRIA_NO_HOST;
-       }
-
-       int s = ::socket ( AF_INET, SOCK_STREAM, 0 );
-       if ( s == -1 )
-       {
-               TRACE("no socket available\n");
-               return CAMBRIA_CANT_CONNECT;
-       }
-
-       struct sockaddr_in servaddr;
-       ::memset ( &servaddr, 0, sizeof(servaddr) );
-
-       ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length );
-       servaddr.sin_family = AF_INET;
-       servaddr.sin_port = ::htons ( port );
-
-       if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) )
-       {
-               TRACE("couldn't connect\n");
-               return CAMBRIA_CANT_CONNECT;
-       }
-
-       ss = s;
-       return 0;
-}
-
-cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count )
-{
-       TRACE ( "Sending %d messages.", count );
-
-       cambriaSendResponse* result = new cambriaSendResponse ();
-       result->statusCode = 0;
-       result->statusMessage = NULL;
-       result->responseBody = NULL;
-
-       TRACE( "building message body\n" );
-
-       std::string body;
-       if ( !buildMessageBody ( streamName, msgs, count, body ) )
-       {
-               result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT;
-               return result;
-       }
-
-       int s = -1;
-       int err = ::openSocket ( fHost, fPort, s );
-       if ( err > 0 )
-       {
-               result->statusCode = err;
-               return result;
-       }
-
-       // construct path
-       std::string path = "/cambriaApiServer/v1/event/";
-       path += fTopic;
-
-       // send post prefix
-       char line[4096];
-       ::sprintf ( line,
-               "POST %s HTTP/1.0\r\n"
-               "Host: %s\r\n"
-               "Content-Type: %s\r\n"
-               "Content-Length: %d\r\n"
-               "\r\n",
-               path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() );
-       write ( s, line );
-
-       // send the body
-       write ( s, body.c_str() );
-
-       TRACE ( "\n" );
-       TRACE ( "send complete, reading reply\n" );
-
-       // receive the response
-       std::string response;
-       readResponse ( s, response );
-       ::close ( s );
-
-       // parse the header and body: split header and body on first occurrence of \r\n\r\n
-       result->statusCode = CAMBRIA_BAD_RESPONSE;
-
-       size_t headerBreak = response.find ( "\r\n\r\n" );
-    if ( headerBreak != std::string::npos )
-    {
-               std::string responseBody = response.substr ( headerBreak + 4 );
-               result->responseBody = new char [ responseBody.length() + 1 ];
-               ::strcpy ( result->responseBody, responseBody.c_str() );
-
-               // all we need from the header for now is the status line
-               std::string headerPart = response.substr ( 0, headerBreak + 2 );
-               
-               size_t newline = headerPart.find ( '\r' );
-               if ( newline != std::string::npos )
-               {
-                       std::string statusLine = headerPart.substr ( 0, newline );
-
-                       size_t firstSpace = statusLine.find ( ' ' );
-                       if ( firstSpace != std::string::npos )
-                       {
-                               size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
-                               if ( secondSpace != std::string::npos )
-                               {
-                                       result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
-                                       std::string statusMessage = statusLine.substr ( secondSpace + 1 );
-                                       result->statusMessage = new char [ statusMessage.length() + 1 ];
-                                       ::strcpy ( result->statusMessage, statusMessage.c_str() );
-                               }
-                       }
-               }
-       }
-       return result;
-}
-
-void cambriaClient::write ( int socket, const char* str )
-{
-       int len = str ? ::strlen ( str ) : 0;
-       ::write ( socket, str, len );
-
-       // elaborate tracing nonsense...
-       std::string trace ( "> " );
-       trace += str;
-       while ( replace ( trace, "\r\n", "\\r\\n\n> " ) );
-
-       TRACE ( "%s", trace.c_str() );
-}
-
-bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer )
-{
-       if ( fFormat == CAMBRIA_NATIVE_FORMAT )
-       {
-               int snLen = ::strlen ( streamName );
-               for ( unsigned int i=0; i<count; i++ )
-               {
-                       const char* msg = msgs[i];
-
-                       std::ostringstream s;
-                       s << snLen << '.' << ::strlen(msg) << '.' << streamName << msg;
-                       buffer.append ( s.str() );
-               }
-       }
-       else if ( fFormat == CAMBRIA_JSON_FORMAT )
-       {
-               buffer.append ( "[" );
-               for ( unsigned int i=0; i<count; i++ )
-               {
-                       if ( i>0 )
-                       {
-                               buffer.append ( "," );
-                       }
-                       const char* msg = msgs[i];
-                       char* jsonMsg = ::makeJsonMessage ( streamName, msg );
-                       buffer.append ( jsonMsg );
-                       delete jsonMsg; // FIXME: allocating memory here just to delete it
-               }
-               buffer.append ( "]" );
-       }
-       else
-       {
-               return false;
-       }
-       return true;
-}
-
-// read the next string into value, and return the end pos, or 0 on error
-static int readNextJsonString ( const std::string& body, int startPos, std::string& value )
-{
-       value = "";
-
-       if ( startPos >= body.length () )
-       {
-               return 0;
-       }
-
-       // skip a comma
-       int current = startPos;
-       if ( body[current] == ',' ) current++;
-
-       if ( current >= body.length() || body[current] != '"' )
-       {
-               return 0;
-       }
-       current++;
-
-       // walk the string for the closing quote (FIXME: unicode support)
-       bool esc = false;
-       int hex = 0;
-       while ( ( body[current] != '"' || esc ) && current < body.length() )
-       {
-               if ( hex > 0 )
-               {
-                       hex--;
-                       if ( hex == 0 )
-                       {
-                               // presumably read a unicode escape. this code isn't
-                               // equipped for multibyte or unicode, so just skip it
-                               value += '?';
-                       }
-               }
-               else if ( esc )
-               {
-                       esc = false;
-                       switch ( body[current] )
-                       {
-                               case '"':
-                               case '\\':
-                               case '/':
-                                       value += body[current];
-                                       break;
-
-                               case 'b': value += '\b'; break;
-                               case 'f': value += '\f'; break;
-                               case 'n': value += '\n'; break;
-                               case 'r': value += '\r'; break;
-                               case 't': value += '\t'; break;
-
-                               case 'u': hex=4; break;
-                       }
-               }
-               else
-               {
-                       esc = body[current] == '\\';
-                       if ( !esc ) value += body[current];
-               }
-               current++;
-       }
-
-       return current + 1;
-}
-
-static void readGetBody ( std::string& body, cambriaGetResponse& response )
-{
-       TRACE("response %s\n", body.c_str() );  
-       
-       if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' )
-       {
-               response.statusCode = CAMBRIA_BAD_RESPONSE;
-       }
-
-       std::list<char*> msgs;
-       std::string val;
-       int current = 1;
-       while ( ( current = readNextJsonString ( body, current, val ) ) > 0 )
-       {
-               char* msg = new char [ val.length() + 1 ];
-               ::strcpy ( msg, val.c_str() );
-               msgs.push_back ( msg );
-       }
-       
-       // now build a response
-       response.messageCount = msgs.size();
-       response.messageSet = new char* [ msgs.size() ];
-       int index = 0;
-       for ( std::list<char*>::iterator it = msgs.begin(); it != msgs.end(); it++ )
-       {
-               response.messageSet [ index++ ] = *it;
-       }
-}
-
-cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit )
-{
-       cambriaGetResponse* result = new cambriaGetResponse ();
-       result->statusCode = 0;
-       result->statusMessage = NULL;
-       result->messageCount = 0;
-       result->messageSet = new char* [ 1 ];
-
-       int s = -1;
-       int err = ::openSocket ( fHost, fPort, s );
-       if ( err > 0 )
-       {
-               result->statusCode = err;
-               return result;
-       }
-       
-       // construct path
-       std::string path = "/cambriaApiServer/v1/event/";
-       path += fTopic;
-
-       bool haveAdds = false;
-       std::ostringstream adds;
-       if ( timeoutMs > -1 )
-       {
-               adds << "timeout=" << timeoutMs;
-               haveAdds = true;
-       }
-       if ( limit > -1 )
-       {
-               if ( haveAdds )
-               {
-                       adds << "&";
-               }
-               adds << "limit=" << limit;
-               haveAdds = true;
-       }
-       if ( haveAdds )
-       {
-               path += "?";
-               path += adds.str();
-       }
-
-       // send post prefix
-       char line[4096];
-       ::sprintf ( line,
-               "GET %s HTTP/1.0\r\n"
-               "Host: %s\r\n"
-               "\r\n",
-               path.c_str(), fHost.c_str() );
-       write ( s, line );
-
-       TRACE ( "\n" );
-       TRACE ( "request sent; reading reply\n" );
-
-       // receive the response (FIXME: would be nice to stream rather than load it all)
-       std::string response;
-       readResponse ( s, response );
-       ::close ( s );
-
-       // parse the header and body: split header and body on first occurrence of \r\n\r\n
-       result->statusCode = CAMBRIA_BAD_RESPONSE;
-
-       size_t headerBreak = response.find ( "\r\n\r\n" );
-    if ( headerBreak != std::string::npos )
-    {
-               // get the header line
-               std::string headerPart = response.substr ( 0, headerBreak + 2 );
-
-               size_t newline = headerPart.find ( '\r' );
-               if ( newline != std::string::npos )
-               {
-                       std::string statusLine = headerPart.substr ( 0, newline );
-
-                       size_t firstSpace = statusLine.find ( ' ' );
-                       if ( firstSpace != std::string::npos )
-                       {
-                               size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
-                               if ( secondSpace != std::string::npos )
-                               {
-                                       result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
-                                       std::string statusMessage = statusLine.substr ( secondSpace + 1 );
-                                       result->statusMessage = new char [ statusMessage.length() + 1 ];
-                                       ::strcpy ( result->statusMessage, statusMessage.c_str() );
-                               }
-                       }
-               }
-
-               if ( result->statusCode < 300 )
-               {
-                       std::string responseBody = response.substr ( headerBreak + 4 );
-                       readGetBody ( responseBody, *result );
-               }
-       }
-       return result;
-}
-
-
-///////////////////////////////////////////////////////////////////////////////
-///////////////////////////////////////////////////////////////////////////////
-///////////////////////////////////////////////////////////////////////////////
-
-CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format )
-{
-       cambriaClient* cc = new cambriaClient ( host, port, topic, format );
-       return toOpaque(cc);
-}
-
-void cambriaDestroyClient ( CAMBRIA_CLIENT client )
-{
-       delete fromOpaque ( client );
-}
-
-cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message )
-{
-       cambriaClient* c = fromOpaque ( client );
-       return c->send ( streamName, message );
-}
-
-cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count )
-{
-       cambriaClient* c = fromOpaque ( client );
-       return c->send ( streamName, messages, count );
-}
-
-cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit )
-{
-       cambriaClient* c = fromOpaque ( client );
-       return c->get ( timeoutMs, limit );
-}
-
-void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response )
-{
-       if ( response )
-       {
-               delete response->statusMessage;
-               delete response->responseBody;
-               delete response;
-       }
-}
-
-void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response )
-{
-       if ( response )
-       {
-               delete response->statusMessage;
-               for ( int i=0; i<response->messageCount; i++ )
-               {
-                       delete response->messageSet[i];
-               }
-               delete response;
-       }
-}
-
-int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg )
-{
-       return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 );
-}
-
-int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount )
-{
-       int count = 0;
-
-       const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT );
-       if ( cc )
-       {
-               const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount );
-               if ( response && response->statusCode < 300 )
-               {
-                       count = msgCount;
-               }
-               ::cambriaDestroySendResponse ( cc, response );
-               ::cambriaDestroyClient ( cc );
-       }
-
-       return count;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-////////////////////////////////////////////////////////////////////////////////
-////////////////////////////////////////////////////////////////////////////////
-
-const unsigned int kMaxTraceBuffer = 2048;
-
-static void writeTraceString ( const char* msg )
-{
-       ::fprintf ( stdout, "%s", msg );
-       ::fflush ( stdout );    // because we want output before core dumping :-)
-}
-
-void traceOutput ( const char* format, ... )
-{
-       char buffer [ kMaxTraceBuffer ];
-       ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) );
-
-       va_list list;
-       va_start ( list, format );
-       ::vsprintf ( buffer, format, list );
-       writeTraceString ( buffer );
-       va_end ( list );
-}