.proto
rpc methodDoubleStream(stream HelloRequest) returns (stream HelloReply) {}
服务端
public StreamObserver<Helloworld.HelloRequest> methodDoubleStream(StreamObserver<Helloworld.HelloReply> responseObserver){ return new StreamObserver<Helloworld.HelloRequest>() { @Override public void onNext(Helloworld.HelloRequest request) { System.out.println("收到了请求\n"); Helloworld.HelloReply result = Helloworld.HelloReply.newBuilder().setMessage("hello alice").build(); responseObserver.onNext(result); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { responseObserver.onCompleted(); } }; }
客户端
/** * 双向流 */ @Test public void contextLoad4(){ Channel channel = ManagedChannelBuilder .forAddress("127.0.0.1", 9098)//服务端 .usePlaintext(true)//usePlaintext的意思是使用明文不加密(应该可以加密) .build(); //异步存根 GreeterGrpc.GreeterStub greeterStub = GreeterGrpc.newStub(channel); StreamObserver<Helloworld.HelloReply> responseObserver = new StreamObserver<Helloworld.HelloReply>() { @Override public void onNext(Helloworld.HelloReply helloReply) { System.out.println("返回了结果\n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; Helloworld.HelloRequest request = Helloworld.HelloRequest.newBuilder().setName("hello world").build(); StreamObserver<Helloworld.HelloRequest> result = greeterStub.methodDoubleStream(responseObserver); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } }
响应结果