3 # Copyright (C) 2019 Bell Canada.
4 # Modifications Copyright © 2018-2019 AT&T Intellectual Property.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
22 def _unary_unary_rpc_terminator(code, details):
23 def terminate(ignored_request, context):
24 context.abort(code, details)
26 return grpc.unary_unary_rpc_method_handler(terminate)
29 class RequestHeaderValidatorInterceptor(grpc.ServerInterceptor):
31 def __init__(self, header, value, code, details):
34 self._terminator = _unary_unary_rpc_terminator(code, details)
36 def intercept_service(self, continuation, handler_call_details):
37 if (self._header, self._value) in handler_call_details.invocation_metadata:
38 return continuation(handler_call_details)
40 return self._terminator