msb protocol synch change
[msb/apigateway.git] / msb-core / openresty-ext / src / assembly / resources / openresty / lualib / resty / http.lua
1 local http_headers = require "resty.http_headers"
2
3 local ngx_socket_tcp = ngx.socket.tcp
4 local ngx_req = ngx.req
5 local ngx_req_socket = ngx_req.socket
6 local ngx_req_get_headers = ngx_req.get_headers
7 local ngx_req_get_method = ngx_req.get_method
8 local str_gmatch = string.gmatch
9 local str_lower = string.lower
10 local str_upper = string.upper
11 local str_find = string.find
12 local str_sub = string.sub
13 local str_gsub = string.gsub
14 local tbl_concat = table.concat
15 local tbl_insert = table.insert
16 local ngx_encode_args = ngx.encode_args
17 local ngx_re_match = ngx.re.match
18 local ngx_re_gsub = ngx.re.gsub
19 local ngx_log = ngx.log
20 local ngx_DEBUG = ngx.DEBUG
21 local ngx_ERR = ngx.ERR
22 local ngx_NOTICE = ngx.NOTICE
23 local ngx_var = ngx.var
24 local co_yield = coroutine.yield
25 local co_create = coroutine.create
26 local co_status = coroutine.status
27 local co_resume = coroutine.resume
28
29
30 -- http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html#sec13.5.1
31 local HOP_BY_HOP_HEADERS = {
32     ["connection"]          = true,
33     ["keep-alive"]          = true,
34     ["proxy-authenticate"]  = true,
35     ["proxy-authorization"] = true,
36     ["te"]                  = true,
37     ["trailers"]            = true,
38     ["transfer-encoding"]   = true,
39     ["upgrade"]             = true,
40     ["content-length"]      = true, -- Not strictly hop-by-hop, but Nginx will deal
41                                     -- with this (may send chunked for example).
42 }
43
44
45 -- Reimplemented coroutine.wrap, returning "nil, err" if the coroutine cannot
46 -- be resumed. This protects user code from inifite loops when doing things like
47 -- repeat
48 --   local chunk, err = res.body_reader()
49 --   if chunk then -- <-- This could be a string msg in the core wrap function.
50 --     ...
51 --   end
52 -- until not chunk
53 local co_wrap = function(func)
54     local co = co_create(func)
55     if not co then
56         return nil, "could not create coroutine"
57     else
58         return function(...)
59             if co_status(co) == "suspended" then
60                 return select(2, co_resume(co, ...))
61             else
62                 return nil, "can't resume a " .. co_status(co) .. " coroutine"
63             end
64         end
65     end
66 end
67
68
69 local _M = {
70     _VERSION = '0.09',
71 }
72 _M._USER_AGENT = "lua-resty-http/" .. _M._VERSION .. " (Lua) ngx_lua/" .. ngx.config.ngx_lua_version
73
74 local mt = { __index = _M }
75
76
77 local HTTP = {
78     [1.0] = " HTTP/1.0\r\n",
79     [1.1] = " HTTP/1.1\r\n",
80 }
81
82 local DEFAULT_PARAMS = {
83     method = "GET",
84     path = "/",
85     version = 1.1,
86 }
87
88
89 function _M.new(self)
90     local sock, err = ngx_socket_tcp()
91     if not sock then
92         return nil, err
93     end
94     return setmetatable({ sock = sock, keepalive = true }, mt)
95 end
96
97
98 function _M.set_timeout(self, timeout)
99     local sock = self.sock
100     if not sock then
101         return nil, "not initialized"
102     end
103
104     return sock:settimeout(timeout)
105 end
106
107
108 function _M.ssl_handshake(self, ...)
109     local sock = self.sock
110     if not sock then
111         return nil, "not initialized"
112     end
113
114     self.ssl = true
115
116     return sock:sslhandshake(...)
117 end
118
119
120 function _M.connect(self, ...)
121     local sock = self.sock
122     if not sock then
123         return nil, "not initialized"
124     end
125
126     self.host = select(1, ...)
127     self.port = select(2, ...)
128
129     -- If port is not a number, this is likely a unix domain socket connection.
130     if type(self.port) ~= "number" then
131         self.port = nil
132     end
133
134     self.keepalive = true
135
136     return sock:connect(...)
137 end
138
139
140 function _M.set_keepalive(self, ...)
141     local sock = self.sock
142     if not sock then
143         return nil, "not initialized"
144     end
145
146     if self.keepalive == true then
147         return sock:setkeepalive(...)
148     else
149         -- The server said we must close the connection, so we cannot setkeepalive.
150         -- If close() succeeds we return 2 instead of 1, to differentiate between
151         -- a normal setkeepalive() failure and an intentional close().
152         local res, err = sock:close()
153         if res then
154             return 2, "connection must be closed"
155         else
156             return res, err
157         end
158     end
159 end
160
161
162 function _M.get_reused_times(self)
163     local sock = self.sock
164     if not sock then
165         return nil, "not initialized"
166     end
167
168     return sock:getreusedtimes()
169 end
170
171
172 function _M.close(self)
173     local sock = self.sock
174     if not sock then
175         return nil, "not initialized"
176     end
177
178     return sock:close()
179 end
180
181
182 local function _should_receive_body(method, code)
183     if method == "HEAD" then return nil end
184     if code == 204 or code == 304 then return nil end
185     if code >= 100 and code < 200 then return nil end
186     return true
187 end
188
189
190 function _M.parse_uri(self, uri)
191     local m, err = ngx_re_match(uri, [[^(http[s]?)://([^:/]+)(?::(\d+))?(.*)]],
192         "jo")
193
194     if not m then
195         if err then
196             return nil, "failed to match the uri: " .. uri .. ", " .. err
197         end
198
199         return nil, "bad uri: " .. uri
200     else
201         if m[3] then
202             m[3] = tonumber(m[3])
203         else
204             if m[1] == "https" then
205                 m[3] = 443
206             else
207                 m[3] = 80
208             end
209         end
210         if not m[4] or "" == m[4] then m[4] = "/" end
211         return m, nil
212     end
213 end
214
215
216 local function _format_request(params)
217     local version = params.version
218     local headers = params.headers or {}
219
220     local query = params.query or ""
221     if query then
222         if type(query) == "table" then
223             query = "?" .. ngx_encode_args(query)
224         end
225     end
226
227     -- Initialize request
228     local req = {
229         str_upper(params.method),
230         " ",
231         params.path,
232         query,
233         HTTP[version],
234         -- Pre-allocate slots for minimum headers and carriage return.
235         true,
236         true,
237         true,
238     }
239     local c = 6 -- req table index it's faster to do this inline vs table.insert
240
241     -- Append headers
242     for key, values in pairs(headers) do
243         if type(values) ~= "table" then
244             values = {values}
245         end
246
247         key = tostring(key)
248         for _, value in pairs(values) do
249             req[c] = key .. ": " .. tostring(value) .. "\r\n"
250             c = c + 1
251         end
252     end
253
254     -- Close headers
255     req[c] = "\r\n"
256
257     return tbl_concat(req)
258 end
259
260
261 local function _receive_status(sock)
262     local line, err = sock:receive("*l")
263     if not line then
264         return nil, nil, nil, err
265     end
266
267     return tonumber(str_sub(line, 10, 12)), tonumber(str_sub(line, 6, 8)), str_sub(line, 14)
268 end
269
270
271
272 local function _receive_headers(sock)
273     local headers = http_headers.new()
274
275     repeat
276         local line, err = sock:receive("*l")
277         if not line then
278             return nil, err
279         end
280
281         for key, val in str_gmatch(line, "([^:%s]+):%s*(.+)") do
282             if headers[key] then
283                 if type(headers[key]) ~= "table" then
284                     headers[key] = { headers[key] }
285                 end
286                 tbl_insert(headers[key], tostring(val))
287             else
288                 headers[key] = tostring(val)
289             end
290         end
291     until str_find(line, "^%s*$")
292
293     return headers, nil
294 end
295
296
297 local function _chunked_body_reader(sock, default_chunk_size)
298     return co_wrap(function(max_chunk_size)
299         local max_chunk_size = max_chunk_size or default_chunk_size
300         local remaining = 0
301         local length
302
303         repeat
304             -- If we still have data on this chunk
305             if max_chunk_size and remaining > 0 then
306
307                 if remaining > max_chunk_size then
308                     -- Consume up to max_chunk_size
309                     length = max_chunk_size
310                     remaining = remaining - max_chunk_size
311                 else
312                     -- Consume all remaining
313                     length = remaining
314                     remaining = 0
315                 end
316             else -- This is a fresh chunk
317
318                 -- Receive the chunk size
319                 local str, err = sock:receive("*l")
320                 if not str then
321                     co_yield(nil, err)
322                 end
323
324                 length = tonumber(str, 16)
325
326                 if not length then
327                     co_yield(nil, "unable to read chunksize")
328                 end
329
330                 if max_chunk_size and length > max_chunk_size then
331                     -- Consume up to max_chunk_size
332                     remaining = length - max_chunk_size
333                     length = max_chunk_size
334                 end
335             end
336
337             if length > 0 then
338                 local str, err = sock:receive(length)
339                 if not str then
340                     co_yield(nil, err)
341                 end
342
343                 max_chunk_size = co_yield(str) or default_chunk_size
344
345                 -- If we're finished with this chunk, read the carriage return.
346                 if remaining == 0 then
347                     sock:receive(2) -- read \r\n
348                 end
349             else
350                 -- Read the last (zero length) chunk's carriage return
351                 sock:receive(2) -- read \r\n
352             end
353
354         until length == 0
355     end)
356 end
357
358
359 local function _body_reader(sock, content_length, default_chunk_size)
360     return co_wrap(function(max_chunk_size)
361         local max_chunk_size = max_chunk_size or default_chunk_size
362
363         if not content_length and max_chunk_size then
364             -- We have no length, but wish to stream.
365             -- HTTP 1.0 with no length will close connection, so read chunks to the end.
366             repeat
367                 local str, err, partial = sock:receive(max_chunk_size)
368                 if not str and err == "closed" then
369                     max_chunk_size = tonumber(co_yield(partial, err) or default_chunk_size)
370                 end
371
372                 max_chunk_size = tonumber(co_yield(str) or default_chunk_size)
373                 if max_chunk_size and max_chunk_size < 0 then max_chunk_size = nil end
374
375                 if not max_chunk_size then
376                     ngx_log(ngx_ERR, "Buffer size not specified, bailing")
377                     break
378                 end
379             until not str
380
381         elseif not content_length then
382             -- We have no length but don't wish to stream.
383             -- HTTP 1.0 with no length will close connection, so read to the end.
384             co_yield(sock:receive("*a"))
385
386         elseif not max_chunk_size then
387             -- We have a length and potentially keep-alive, but want everything.
388             co_yield(sock:receive(content_length))
389
390         else
391             -- We have a length and potentially a keep-alive, and wish to stream
392             -- the response.
393             local received = 0
394             repeat
395                 local length = max_chunk_size
396                 if received + length > content_length then
397                     length = content_length - received
398                 end
399
400                 if length > 0 then
401                     local str, err = sock:receive(length)
402                     if not str then
403                         max_chunk_size = tonumber(co_yield(nil, err) or default_chunk_size)
404                     end
405                     received = received + length
406
407                     max_chunk_size = tonumber(co_yield(str) or default_chunk_size)
408                     if max_chunk_size and max_chunk_size < 0 then max_chunk_size = nil end
409
410                     if not max_chunk_size then
411                         ngx_log(ngx_ERR, "Buffer size not specified, bailing")
412                         break
413                     end
414                 end
415
416             until length == 0
417         end
418     end)
419 end
420
421
422 local function _no_body_reader()
423     return nil
424 end
425
426
427 local function _read_body(res)
428     local reader = res.body_reader
429
430     if not reader then
431         -- Most likely HEAD or 304 etc.
432         return nil, "no body to be read"
433     end
434
435     local chunks = {}
436     local c = 1
437
438     local chunk, err
439     repeat
440         chunk, err = reader()
441
442         if err then
443             return nil, err, tbl_concat(chunks) -- Return any data so far.
444         end
445         if chunk then
446             chunks[c] = chunk
447             c = c + 1
448         end
449     until not chunk
450
451     return tbl_concat(chunks)
452 end
453
454
455 local function _trailer_reader(sock)
456     return co_wrap(function()
457         co_yield(_receive_headers(sock))
458     end)
459 end
460
461
462 local function _read_trailers(res)
463     local reader = res.trailer_reader
464     if not reader then
465         return nil, "no trailers"
466     end
467
468     local trailers = reader()
469     setmetatable(res.headers, { __index = trailers })
470 end
471
472
473 local function _send_body(sock, body)
474     if type(body) == 'function' then
475         repeat
476             local chunk, err, partial = body()
477
478             if chunk then
479                 local ok,err = sock:send(chunk)
480
481                 if not ok then
482                     return nil, err
483                 end
484             elseif err ~= nil then
485                 return nil, err, partial
486             end
487
488         until chunk == nil
489     elseif body ~= nil then
490         local bytes, err = sock:send(body)
491
492         if not bytes then
493             return nil, err
494         end
495     end
496     return true, nil
497 end
498
499
500 local function _handle_continue(sock, body)
501     local status, version, reason, err = _receive_status(sock)
502     if not status then
503         return nil, nil, err
504     end
505
506     -- Only send body if we receive a 100 Continue
507     if status == 100 then
508         local ok, err = sock:receive("*l") -- Read carriage return
509         if not ok then
510             return nil, nil, err
511         end
512         _send_body(sock, body)
513     end
514     return status, version, err
515 end
516
517
518 function _M.send_request(self, params)
519     -- Apply defaults
520     setmetatable(params, { __index = DEFAULT_PARAMS })
521
522     local sock = self.sock
523     local body = params.body
524     local headers = http_headers.new()
525
526     local params_headers = params.headers
527     if params_headers then
528         -- We assign one by one so that the metatable can handle case insensitivity
529         -- for us. You can blame the spec for this inefficiency.
530         for k,v in pairs(params_headers) do
531             headers[k] = v
532         end
533     end
534
535     -- Ensure minimal headers are set
536     if type(body) == 'string' and not headers["Content-Length"] then
537         headers["Content-Length"] = #body
538     end
539     if not headers["Host"] then
540         if (str_sub(self.host, 1, 5) == "unix:") then
541             return nil, "Unable to generate a useful Host header for a unix domain socket. Please provide one."
542         end
543         -- If we have a port (i.e. not connected to a unix domain socket), and this
544         -- port is non-standard, append it to the Host heaer.
545         if self.port then
546             if self.ssl and self.port ~= 443 then
547                 headers["Host"] = self.host .. ":" .. self.port
548             elseif not self.ssl and self.port ~= 80 then
549                 headers["Host"] = self.host .. ":" .. self.port
550             else
551                 headers["Host"] = self.host
552             end
553         else
554             headers["Host"] = self.host
555         end
556     end
557     if not headers["User-Agent"] then
558         headers["User-Agent"] = _M._USER_AGENT
559     end
560     if params.version == 1.0 and not headers["Connection"] then
561         headers["Connection"] = "Keep-Alive"
562     end
563
564     params.headers = headers
565
566     -- Format and send request
567     local req = _format_request(params)
568     ngx_log(ngx_DEBUG, "\n", req)
569     local bytes, err = sock:send(req)
570
571     if not bytes then
572         return nil, err
573     end
574
575     -- Send the request body, unless we expect: continue, in which case
576     -- we handle this as part of reading the response.
577     if headers["Expect"] ~= "100-continue" then
578         local ok, err, partial = _send_body(sock, body)
579         if not ok then
580             return nil, err, partial
581         end
582     end
583
584     return true
585 end
586
587
588 function _M.read_response(self, params)
589     local sock = self.sock
590
591     local status, version, reason, err
592
593     -- If we expect: continue, we need to handle this, sending the body if allowed.
594     -- If we don't get 100 back, then status is the actual status.
595     if params.headers["Expect"] == "100-continue" then
596         local _status, _version, _err = _handle_continue(sock, params.body)
597         if not _status then
598             return nil, _err
599         elseif _status ~= 100 then
600             status, version, err = _status, _version, _err
601         end
602     end
603
604     -- Just read the status as normal.
605     if not status then
606         status, version, reason, err = _receive_status(sock)
607         if not status then
608             return nil, err
609         end
610     end
611
612
613     local res_headers, err = _receive_headers(sock)
614     if not res_headers then
615         return nil, err
616     end
617
618     -- keepalive is true by default. Determine if this is correct or not.
619     local ok, connection = pcall(str_lower, res_headers["Connection"])
620     if ok then
621         if  (version == 1.1 and connection == "close") or
622             (version == 1.0 and connection ~= "keep-alive") then
623             self.keepalive = false
624         end
625     else
626         -- no connection header
627         if version == 1.0 then
628             self.keepalive = false
629         end
630     end
631
632     local body_reader = _no_body_reader
633     local trailer_reader, err = nil, nil
634     local has_body = false
635
636     -- Receive the body_reader
637     if _should_receive_body(params.method, status) then
638         local ok, encoding = pcall(str_lower, res_headers["Transfer-Encoding"])
639         if ok and version == 1.1 and encoding == "chunked" then
640             body_reader, err = _chunked_body_reader(sock)
641             has_body = true
642         else
643
644             local ok, length = pcall(tonumber, res_headers["Content-Length"])
645             if ok then
646                 body_reader, err = _body_reader(sock, length)
647                 has_body = true
648             end
649         end
650     end
651
652     if res_headers["Trailer"] then
653         trailer_reader, err = _trailer_reader(sock)
654     end
655
656     if err then
657         return nil, err
658     else
659         return {
660             status = status,
661             reason = reason,
662             headers = res_headers,
663             has_body = has_body,
664             body_reader = body_reader,
665             read_body = _read_body,
666             trailer_reader = trailer_reader,
667             read_trailers = _read_trailers,
668         }
669     end
670 end
671
672
673 function _M.request(self, params)
674     local res, err = self:send_request(params)
675     if not res then
676         return res, err
677     else
678         return self:read_response(params)
679     end
680 end
681
682
683 function _M.request_pipeline(self, requests)
684     for i, params in ipairs(requests) do
685         if params.headers and params.headers["Expect"] == "100-continue" then
686             return nil, "Cannot pipeline request specifying Expect: 100-continue"
687         end
688
689         local res, err = self:send_request(params)
690         if not res then
691             return res, err
692         end
693     end
694
695     local responses = {}
696     for i, params in ipairs(requests) do
697         responses[i] = setmetatable({
698             params = params,
699             response_read = false,
700         }, {
701             -- Read each actual response lazily, at the point the user tries
702             -- to access any of the fields.
703             __index = function(t, k)
704                 local res, err
705                 if t.response_read == false then
706                     res, err = _M.read_response(self, t.params)
707                     t.response_read = true
708
709                     if not res then
710                         ngx_log(ngx_ERR, err)
711                     else
712                         for rk, rv in pairs(res) do
713                             t[rk] = rv
714                         end
715                     end
716                 end
717                 return rawget(t, k)
718             end,
719         })
720     end
721     return responses
722 end
723
724
725 function _M.request_uri(self, uri, params)
726     if not params then params = {} end
727
728     local parsed_uri, err = self:parse_uri(uri)
729     if not parsed_uri then
730         return nil, err
731     end
732
733     local scheme, host, port, path = unpack(parsed_uri)
734     if not params.path then params.path = path end
735
736     local c, err = self:connect(host, port)
737     if not c then
738         return nil, err
739     end
740
741     if scheme == "https" then
742         local verify = true
743         if params.ssl_verify == false then
744             verify = false
745         end
746         local ok, err = self:ssl_handshake(nil, host, verify)
747         if not ok then
748             return nil, err
749         end
750     end
751
752     local res, err = self:request(params)
753     if not res then
754         return nil, err
755     end
756
757     local body, err = res:read_body()
758     if not body then
759         return nil, err
760     end
761
762     res.body = body
763
764     local ok, err = self:set_keepalive()
765     if not ok then
766         ngx_log(ngx_ERR, err)
767     end
768
769     return res, nil
770 end
771
772
773 function _M.get_client_body_reader(self, chunksize, sock)
774     local chunksize = chunksize or 65536
775     if not sock then
776         local ok, err
777         ok, sock, err = pcall(ngx_req_socket)
778
779         if not ok then
780             return nil, sock -- pcall err
781         end
782
783         if not sock then
784             if err == "no body" then
785                 return nil
786             else
787                 return nil, err
788             end
789         end
790     end
791
792     local headers = ngx_req_get_headers()
793     local length = headers.content_length
794     local encoding = headers.transfer_encoding
795     if length then
796         return _body_reader(sock, tonumber(length), chunksize)
797     elseif encoding and str_lower(encoding) == 'chunked' then
798         -- Not yet supported by ngx_lua but should just work...
799         return _chunked_body_reader(sock, chunksize)
800     else
801        return nil
802     end
803 end
804
805
806 function _M.proxy_request(self, chunksize)
807     return self:request{
808         method = ngx_req_get_method(),
809         path = ngx_re_gsub(ngx_var.uri, "\\s", "%20", "jo") .. ngx_var.is_args .. (ngx_var.query_string or ""),
810         body = self:get_client_body_reader(chunksize),
811         headers = ngx_req_get_headers(),
812     }
813 end
814
815
816 function _M.proxy_response(self, response, chunksize)
817     if not response then
818         ngx_log(ngx_ERR, "no response provided")
819         return
820     end
821
822     ngx.status = response.status
823
824     -- Filter out hop-by-hop headeres
825     for k,v in pairs(response.headers) do
826         if not HOP_BY_HOP_HEADERS[str_lower(k)] then
827             ngx.header[k] = v
828         end
829     end
830
831     local reader = response.body_reader
832     repeat
833         local chunk, err = reader(chunksize)
834         if err then
835             ngx_log(ngx_ERR, err)
836             break
837         end
838
839         if chunk then
840             local res, err = ngx.print(chunk)
841             if not res then
842                 ngx_log(ngx_ERR, err)
843                 break
844             end
845         end
846     until not chunk
847 end
848
849
850 return _M