python使用rpc框架gRPC的方法

  

使用Python编写gRPC服务可以快速构建高效、可扩展的分布式应用程序。本文将详细介绍如何使用Python实现gRPC服务。

1. 安装gRPC

gRPC依赖于protobuf3,先安装protobuf3:

pip install protobuf

然后安装gRPC:

pip install grpcio grpcio-tools

2. 定义protobuf文件

protobuf是gRPC的序列化协议,可用于定义数据结构和服务。我们需要先定义protobuf文件。

syntax = "proto3";

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

我们定义了一个服务Greeter,其中包含了一个方法SayHello。

3. 使用protobuf文件生成代码

通过protobuf文件,我们可以使用grpcio-tools生成Python代码。

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello.proto

上述命令根据定义的protobuf文件生成了hello_pb2.py和hello_pb2_grpc.py两个文件。

4. 实现gRPC服务

我们需要自己实现服务端和客户端代码。

服务端代码:

import grpc
import time

from concurrent import futures
import hello_pb2
import hello_pb2_grpc

class Greeter(hello_pb2_grpc.GreeterServicer):

    def SayHello(self, request, context):
        message = f'Hello {request.name}'
        return hello_pb2.HelloReply(message=message)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print('Server started!')
    try:
        while True:
            time.sleep(86400)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()

客户端代码:

import grpc
import hello_pb2
import hello_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = hello_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(hello_pb2.HelloRequest(name='World'))
    print(f"Greeter client received: {response.message}")

if __name__ == '__main__':
    run()

5. 运行gRPC服务

我们需要先启动gRPC服务端,然后再启动gRPC客户端。

# 启动gRPC服务端
python server.py
# 启动gRPC客户端
python client.py

客户端输出结果如下:

Greeter client received: Hello World

6. 实现长连接

gRPC支持长连接,服务端可以在客户端断开连接的时候自动重连。我们需要在客户端加入keepalive参数。

import grpc
import hello_pb2
import hello_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051', options=[('grpc.keepalive_timeout_ms', 10000)]) as channel:
        stub = hello_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(hello_pb2.HelloRequest(name='World'))
    print(f"Greeter client received: {response.message}")

示例说明

一、Python使用gRPC框架发送POST请求

import grpc
import requests
from concurrent import futures

import hello_pb2
import hello_pb2_grpc

class Greeter(hello_pb2_grpc.GreeterServicer):

    def SayHello(self, request, context):
        url = 'https://reqbin.com/echo/post/json'
        payload = request.SerializeToString()

        headers = {
          'Content-Type': 'application/x-protobuf',
          'Accept': 'application/x-protobuf'
        }

        response = requests.post(url, data=payload, headers=headers)

        message = response.content.decode('utf-8')

        return hello_pb2.HelloReply(message=message)


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print('Server started!')
    try:
        while True:
            time.sleep(86400)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()

这里我们定义了一个SayHello方法,在该方法中我们发送了一个POST请求,将protobuf消息作为请求体发送,返回了json格式的响应。

二、Python使用gRPC框架调用TensorFlow Serving提供的服务

import grpc

from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc


def run():
    channel = grpc.insecure_channel('localhost:8500')
    stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

    request = predict_pb2.PredictRequest()
    request.model_spec.name = 'my_model'
    request.model_spec.signature_name = 'predict'

    request.inputs['x'].CopyFrom(tensor.to_proto(X))

    result = stub.Predict(request)

    y_pred = tensor_proto_to_ndarray(result.outputs['y_pred'])

if __name__ == '__main__':
    run()

这里我们定义了一个run方法,先创建了一个grpc的channel,然后通过channel创建了一个PredictionServiceStub。通过PredictRequest将需要预测的数据通过gRPC发送给TensorFlow Serving并获得预测结果。

相关文章