2 # Licensed to the Apache Software Foundation (ASF) under one or more
3 # contributor license agreements. See the NOTICE file distributed with
4 # this work for additional information regarding copyright ownership.
5 # The ASF licenses this file to You under the Apache License, Version 2.0
6 # (the "License"); you may not use this file except in compliance with
7 # the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 ``ctx`` proxy client implementation.
28 # Environment variable for the socket url (used by clients to locate the socket)
29 CTX_SOCKET_URL = 'CTX_SOCKET_URL'
32 class _RequestError(RuntimeError):
34 def __init__(self, ex_message, ex_type, ex_traceback):
35 super(_RequestError, self).__init__(self, '{0}: {1}'.format(ex_type, ex_message))
36 self.ex_type = ex_type
37 self.ex_message = ex_message
38 self.ex_traceback = ex_traceback
41 def _http_request(socket_url, request, method, timeout):
42 opener = urllib2.build_opener(urllib2.HTTPHandler)
43 request = urllib2.Request(socket_url, data=json.dumps(request))
44 request.get_method = lambda: method
45 response = opener.open(request, timeout=timeout)
47 if response.code != 200:
48 raise RuntimeError('Request failed: {0}'.format(response))
49 return json.loads(response.read())
52 def _client_request(socket_url, args, timeout, method='POST'):
53 response = _http_request(
54 socket_url=socket_url,
55 request={'args': args},
59 payload = response.get('payload')
60 response_type = response.get('type')
61 if response_type == 'error':
62 ex_type = payload['type']
63 ex_message = payload['message']
64 ex_traceback = payload['traceback']
65 raise _RequestError(ex_message, ex_type, ex_traceback)
66 elif response_type == 'stop_operation':
67 raise SystemExit(payload['message'])
72 def _parse_args(args):
73 parser = argparse.ArgumentParser()
74 parser.add_argument('-t', '--timeout', type=int, default=30)
75 parser.add_argument('--socket-url', default=os.environ.get(CTX_SOCKET_URL))
76 parser.add_argument('--json-arg-prefix', default='@')
77 parser.add_argument('-j', '--json-output', action='store_true')
78 parser.add_argument('args', nargs='*')
79 args = parser.parse_args(args=args)
80 if not args.socket_url:
81 raise RuntimeError('Missing CTX_SOCKET_URL environment variable '
82 'or socket_url command line argument. (ctx is supposed to be executed '
83 'within an operation context)')
87 def _process_args(json_prefix, args):
90 if arg.startswith(json_prefix):
91 arg = json.loads(arg[1:])
92 processed_args.append(arg)
97 args = _parse_args(args)
98 response = _client_request(
100 args=_process_args(args.json_arg_prefix, args.args),
101 timeout=args.timeout)
103 response = json.dumps(response)
108 response = str(response)
109 except UnicodeEncodeError:
110 response = unicode(response).encode('utf8')
111 sys.stdout.write(response)
113 if __name__ == '__main__':