gRPC的定义:
主要流程:
添加pom依赖, 包含依赖包和生成基础类的插件。
4.0.0 vip.sunjin GrpcServer 1.0-SNAPSHOT UTF-8 UTF-8 1.8 1.36.1 3.15.6 com.google.protobuf protobuf-java ${protobuf.version} io.grpc grpc-netty ${grpc.version} io.grpc grpc-protobuf ${grpc.version} io.grpc grpc-stub ${grpc.version} kr.motd.maven os-maven-plugin 1.6.2 org.springframework.boot spring-boot-maven-plugin org.xolstice.maven.plugins protobuf-maven-plugin 0.6.1 com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier} grpc-java io.grpc:protoc-gen-grpc-java:1.36.0:exe:${os.detected.classifier} compile compile-custom
创建一个文件夹src/main/proto/
创建一个helloworld.proto文件
syntax = "proto3";option java_multiple_files = true;
option java_package = "vip.sunjin.examples.helloworld";
option java_outer_classname = "HelloWorldProto";package helloworld;// The greeting service definition.
service Greeter {// Sends a greetingrpc SayHello (HelloRequest) returns (HelloReply) {}
}// The request message containing the user's name.
message HelloRequest {string name = 1;
}// The response message containing the greetings
message HelloReply {string message = 1;
}
使用maven编译项目 生成基础类
生成后的类文件如下:
targetclassesvipsunjinexampleshelloworldGreeterGrpc.javaHelloReply.javaHelloReplyOrBuilder.javaHelloRequest.javaHelloRequestOrBuilder.javaHelloWorldClient.javaHelloWorldProto.javaHelloWorldServer.java
GreeterGrpc封装基本的GRPC功能,后续的客户端和服务端都从这个类引申出来。
服务端只需要指定一个端口号,然后暴露一个服务。
/*** Server that manages startup/shutdown of a {@code Greeter} server.*/
public class HelloWorldServer {private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());private Server server;private void start() throws IOException {/* The port on which the server should run */int port = 50051;server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();logger.info("Server started, listening on " + port);Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {// Use stderr here since the logger may have been reset by its JVM shutdown hook.System.err.println("*** shutting down gRPC server since JVM is shutting down");try {HelloWorldServer.this.stop();} catch (InterruptedException e) {e.printStackTrace(System.err);}System.err.println("*** server shut down");}});}private void stop() throws InterruptedException {if (server != null) {server.shutdown().awaitTermination(30, TimeUnit.SECONDS);}}/*** Await termination on the main thread since the grpc library uses daemon threads.*/private void blockUntilShutdown() throws InterruptedException {if (server != null) {server.awaitTermination();}}/*** Main launches the server from the command line.*/public static void main(String[] args) throws IOException, InterruptedException {final HelloWorldServer server = new HelloWorldServer();server.start();server.blockUntilShutdown();}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {@Overridepublic void sayHello(HelloRequest req, StreamObserver responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}
}
客户端需要指定调用服务的地址和端口号并且通过调用桩代码调用服务端的服务。
客户端和服务端是直连的。
public class HelloWorldClient {private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());private final GreeterGrpc.GreeterBlockingStub blockingStub;/** Construct client for accessing HelloWorld server using the existing channel. */public HelloWorldClient(Channel channel) {// 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to// shut it down.// Passing Channels to code makes code easier to test and makes it easier to reuse Channels.blockingStub = GreeterGrpc.newBlockingStub(channel);}/** Say hello to server. */public void greet(String name) {logger.info("Will try to greet " + name + " ...");HelloRequest request = HelloRequest.newBuilder().setName(name).build();HelloReply response;try {response = blockingStub.sayHello(request);} catch (StatusRuntimeException e) {logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());return;}logger.info("Greeting Reply: " + response.getMessage());}/*** Greet server. If provided, the first element of {@code args} is the name to use in the* greeting. The second argument is the target server.*/public static void main(String[] args) throws Exception {String user = "neil";// Access a service running on the local machine on port 50051String target = "localhost:50051";// Create a communication channel to the server, known as a Channel. Channels are thread-safe// and reusable. It is common to create channels at the beginning of your application and reuse// them until the application shuts down.ManagedChannel channel = ManagedChannelBuilder.forTarget(target)// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid// needing certificates..usePlaintext().build();try {HelloWorldClient client = new HelloWorldClient(channel);client.greet(user);} finally {// ManagedChannels use resources like threads and TCP connections. To prevent leaking these// resources the channel should be shut down when it will no longer be used. If it may be used// again leave it running.channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);}}
}
先启动服务端代码 HelloWorldServer
然后执行客户端代码 HelloWorldClient
执行结果如下:
2月 18, 2023 9:44:17 上午 vip.sunjin.grpcclient.HelloWorldClient greet
信息: Will try to greet neil ...
2月 18, 2023 9:44:18 上午 vip.sunjin.grpcclient.HelloWorldClient greet
信息: Greeting Reply: I'm Grpc Server , Hello neil
一般业务场景下,我们都是使用grpc的simple-rpc模式,也就是每次客户端发起请求,服务端会返回一个响应结果的模式。
但是grpc除了这种一来一往的请求模式外,还有流式模式。
服务端流模式是说客户端发起一次请求后,服务端在接受到请求后,可以以流的方式,使用同一连接,不断的向客户端写回响应结果,客户端则可以源源不断的接受到服务端写回的数据。
下面我们通过简单例子,来说明如何使用,服务端端流。
MetricsService.proto
syntax = "proto3";option java_multiple_files = true;
option java_package = "vip.sunjin.examples.helloworld";
option java_outer_classname = "MetricsServiceProto";message Metric {int64 metric = 2;
}message Average {double val = 1;
}service MetricsService {rpc collectServerStream (Metric) returns (stream Average);
}
然后使用maven编译项目 生成基础类
服务实现类
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {private static final Logger logger = Logger.getLogger(MetricsServiceImpl.class.getName());/*** 服务端流* @param request* @param responseObserver*/@Overridepublic void collectServerStream(Metric request, StreamObserver responseObserver) {logger.info("received request : " + request.getMetric());for(int i = 0; i < 10; i++){responseObserver.onNext(Average.newBuilder().setVal(new Random(1000).nextDouble()).build());logger.info("send to client");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}responseObserver.onCompleted();}}
服务端Server启动类
public class MetricsServer {private static final Logger logger = Logger.getLogger(MetricsServer.class.getName());public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;
// //启动服务MetricsServiceImpl metricsService = new MetricsServiceImpl();Server server = ServerBuilder.forPort(port).addService(metricsService).build();server.start();logger.info("Server started, listening on " + port);server.awaitTermination();}
}
通过异步Stub 调用服务
public class MetricsClient {private static final Logger logger = Logger.getLogger(MetricsClient.class.getName());public static void main(String[] args) throws InterruptedException {int port = 50051;
// //获取客户端桩对象ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:" + port).usePlaintext().build();MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);//发起rpc请求,设置StreamObserver用于监听服务器返回结果stub.collectServerStream(Metric.newBuilder().setMetric(1L).build(), new StreamObserver() {@Overridepublic void onNext(Average value) {System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal());}@Overridepublic void onError(Throwable t) {System.out.println("error:" + t.getLocalizedMessage());}@Overridepublic void onCompleted() {System.out.println("onCompleted:");}});channel.shutdown().awaitTermination(50, TimeUnit.SECONDS);}
}
代码最后要有等待并且关闭通道的操作。
先启动服务端,再启动客户端后,可以看到StreamObserver的onNext方法会源源不断的接受到服务端返回的数据。
客户端流模式是说客户端发起请求与服务端建立链接后,可以使用同一连接,不断的向服务端传送数据,等客户端把全部数据都传送完毕后,服务端才返回一个请求结果。
这里修改service的定义,其他不变。 MetricsService.proto
service MetricsService {rpc collectClientStream (stream Metric) returns (Average);
}
如上rpc方法的入参类型前添加stream标识 是客户端流,然后服务端实现代码如下:
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {private static final Logger logger = Logger.getLogger(MetricsServiceImpl.class.getName());/*** 客户端流** @param responseObserver* @return*/@Overridepublic StreamObserver collectClientStream(StreamObserver responseObserver) {return new StreamObserver() {private long sum = 0;private long count = 0;@Overridepublic void onNext(Metric value) {logger.info("value: " + value);sum += value.getMetric();count++;}@Overridepublic void onError(Throwable t) {logger.info("severError:" + t.getLocalizedMessage());responseObserver.onError(t);}@Overridepublic void onCompleted() {responseObserver.onNext(Average.newBuilder().setVal(sum / count).build());logger.info("serverComplete: ");responseObserver.onCompleted();}};}
}
如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。
服务端启动类MetricsServer不需要修改
客户端调用服务需要使用异步的Stub.
public class MetricsClient2 {private static final Logger logger = Logger.getLogger(MetricsServer.class.getName());public static void main(String[] args) throws InterruptedException {int port = 50051;//1.创建客户端桩ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build();MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);//2.发起请求,并设置结果回调监听StreamObserver collect = stub.collectClientStream(new StreamObserver() {@Overridepublic void onNext(Average value) {logger.info(Thread.currentThread().getName() + "Average: " + value.getVal());}@Overridepublic void onError(Throwable t) {logger.info("error:" + t.getLocalizedMessage());}@Overridepublic void onCompleted() {logger.info("onCompleted:");}});//3.使用同一个链接,不断向服务端传送数据Stream.of(1L, 2L, 3L, 4L,5L).map(l -> Metric.newBuilder().setMetric(l).build()).forEach(metric -> {collect.onNext(metric);logger.info("send to server: " + metric.getMetric());});Thread.sleep(3000);collect.onCompleted();channel.shutdown().awaitTermination(50, TimeUnit.SECONDS);}
}
先启动服务端,再启动客户端后,可以看到代码3会把数据1,2,3,4,5通过同一个链接发送到服务端, 然后等服务端接收完毕数据后,会计算接受到的数据的平均值,然后把平均值写回客户端。 然后代码2设置的监听器的onNext方法就会被回调,然后打印出服务端返回的平均值3。
双向流意味着客户端向服务端发起请求后,客户端可以源源不断向服务端写入数据的同时,服务端可以源源不断向客户端写入数据。
这里修改service的定义,其他不变。 重新生成基础代码。 MetricsService.proto
service MetricsService {rpc collectTwoWayStream (stream Metric) returns (stream Average);
}
如上rpc方法的入参类型前添加stream标识, 返回参数前也添加stream标识 就是双向流,然后服务端实现代码如下:
双向流的代码和客户端流基本一样,只是双向流可以同时支持双向的持续写入。
将服务实现类进行修改用来测试双向流。
public void onNext(Metric value) {logger.info("value: " + value);sum += value.getMetric();count++;responseObserver.onNext(Average.newBuilder().setVal(sum * 1.0/ count).build());
}
如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据, 然后 不断的调用参数中的流对象把响应结果持续的写回客户端。 实现了双向流式调用。
客户端代码主要是把onCompleted之前的线程等待时间加长,以便等待服务端持续的返回。
Thread.sleep(10000);
collect.onCompleted();
先启动服务端,再启动客户端后,可以看到代码3会把数据1,2,3,4,5通过同一个链接发送到服务端, 然后等服务端接收数据后,会实时的计算接受到的数据的平均值,然后把平均值写回客户端。 然后代码2设置的监听器的onNext方法就会被回调,然后打印出服务端返回的平均值。
上一篇:nps内网穿透工具