Java 实现grpc
官方地址:https://grpc.io/docs/what-is-grpc/introduction/在gRPC中,客户机应用程序可以直接调用不同机器上的服务器应用程序上的方法,就像它是本地对象一样,使您更容易创建分布式应用程序和服务。与许多RPC系统一样,gRPC基于定义服务的思想,指定可以远程调用的方法及其参数和返回类型。在服务器端,服务器实现这个接口,并运行gRPC服务器来处理客户端调用。
1.grpc简介
官方地址:https://grpc.io/docs/what-is-grpc/introduction/
在gRPC中,客户机应用程序可以直接调用不同机器上的服务器应用程序上的方法,就像它是本地对象一样,使您更容易创建分布式应用程序和服务。与许多RPC系统一样,gRPC基于定义服务的思想,指定可以远程调用的方法及其参数和返回类型。在服务器端,服务器实现这个接口,并运行gRPC服务器来处理客户端调用。在客户端,客户端有一个存根(在某些语言中称为客户端),它提供与服务器相同的方法
gRPC客户端和服务器可以在各种环境中运行并相互通信——从Google内部的服务器到您自己的桌面——并且可以用任何gRPC支持的语言编写。因此,例如,您可以轻松地用Java创建gRPC服务器,用Go、Python或Ruby创建客户端。
2.Protocol Buffers
官方地址:https://protobuf.dev/overview/
默认情况下,gRPC使用Protocol Buffers,这是Google用于序列化结构化数据的成熟开源机制(尽管它可以与JSON等其他数据格式一起使用)
Protocol Buffers是一种语言无关、平台无关的可扩展机制,用于序列化结构化数据。
它类似于JSON,只是更小更快,并且生成本地语言绑定。您只需定义一次数据的结构化方式,然后就可以使用特殊生成的源代码轻松地将结构化数据写入和读取到各种数据流,并使用各种语言。
协议缓冲区是定义语言(在.proto文件中创建)、proto编译器为与数据交互而生成的代码、特定于语言的运行时库以及写入文件(或通过网络连接发送)的数据的序列化格式的组合
3.创建maven项目grpc-demo
3.1 编写maven 配置文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>grpc-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--grpc版本 -->
<grpc.version>1.60.2</grpc.version>
</properties>
<dependencies>
<!-- grpc 需要的依赖-->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<!--它将{@code .proto}文件 生成java源代码 -->
<goal>compile</goal>
<!--它将{@code .proto}文件 生成grpc java源代码 -->
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<!-- 解决jar 包冲突、重复依赖、无效引用 -->
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.4.1</version>
<executions>
<execution>
<id>enforce</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireUpperBoundDeps/>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3.2 在/src/main/目录下创建 helloworld.proto文件
官方地址: https://gitcode.com/grpc/grpc-java/blob/master/examples/src/main/proto/helloworld.proto
// Copyright 2015 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
3.3 生成代码
idea选中项目pom.xml-右键->run maven -> Plugins->protobuf-maven-plugin->protobuf:compile
或者进入到项目目录直接执行mvn命令:
#生成protobuf代码
mvn protobuf:compile
#生成grpc代码
mvn protobuf:compile-custom
生成成功的代码如截图,将代码拷贝到项目对应的位置:
代码拷贝到项目截图:
4.代码编写
4.1 HelloWordServer服务端编写
package io.grpc.examples.helloworld2;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @Author tanyong
* @Version HelloWordServer v1.0.0 2024/4/2 11:21 $$
*/
public class HelloWordServer {
private static int port = 50052;
/**
* grpc服务实例
*/
private Server server;
/**
* 启动 grpc服务
*
* @throws IOException
*/
public void start() throws IOException {
server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
// 添加业务处理类
.addService(new GreeterImpl(port))
.build()
.start();
// 注册了一个JVM关闭钩子(Shutdown Hook),当Java虚拟机(JVM)即将关闭时(无论是正常退出还是非正常退出,如接收到操作系统中断信号)当JVM关闭时,所有已注册的关闭钩子都将被依次调用
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
// 优雅地停止gRPC服务器实例
HelloWordServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
/**
* 停止grpc 实例
*
* @throws InterruptedException
*/
public void stop() throws InterruptedException {
if (Objects.nonNull(server)) {
// 发起服务器的关闭流程,不再接受新的连接和请求,但允许现有连接继续完成请求处理
server.shutdown()
// 给予服务器最长30秒的时间去完成所有待处理的工作,超过这个时间限制,程序将继续执行后续逻辑,即使服务器还有任务未完成
// 这样设计有助于在应用退出时确保资源得到释放,同时也能防止因某些原因导致的长时间无法关闭的问题。
.awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* 确保主线程或者其他调用者线程会在服务器完全关闭之前保持等待状态。
* 在主线程上等待终止,因为grpc库使用守护线程
*
* @throws InterruptedException
*/
public void blockUntilShutdown() throws InterruptedException {
if (Objects.nonNull(server)) {
server.awaitTermination();
}
}
/**
* 业务处理类
*/
public static class GreeterImpl extends GreeterGrpc
.GreeterImplBase {
private final int port;
public GreeterImpl(int port) {
this.port = port;
}
/**
* @param request
* @param responseObserver 这是gRPC提供的响应观察者对象,用于向客户端发送响应。服务端通过调用其方法将响应数据发送给客户端。
*/
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName() + this.port).build();
try {
Thread.sleep(500 + new Random().nextInt(1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 向客户端发送响应数据,即将创建好的 reply 对象推送给客户端。
responseObserver.onNext(reply);
// 表示响应已经结束,没有更多的数据要发送给客户端。
responseObserver.onCompleted();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final HelloWordServer helloWordServer = new HelloWordServer();
helloWordServer.start();
helloWordServer.blockUntilShutdown();
}
}
4.2 自定负载均衡解析器编写
HelloWordConstants.java常量:
public interface HelloWordConstants {
String SCHEME = "example";
String SERVICE_NAME = "lb.example.grpc.io";
}
负载均衡解析器LoadBalanceNameResolver.java
package io.grpc.examples.helloworld2;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @Author tanyong
* @Version LoadBalanceNameResolver v1.0.0 2024/4/2 16:59 $$
*/
public class LoadBalanceNameResolver extends NameResolver {
private Listener2 listener;
private List<InetSocketAddress> socketAddressList;
private String serviceName;
private final Map<String, List<InetSocketAddress>> addrStore = new HashMap<>();
public LoadBalanceNameResolver(List<InetSocketAddress> socketAddressList, String serviceName) {
this.socketAddressList = socketAddressList;
this.serviceName = serviceName;
addrStore.put(serviceName, socketAddressList);
}
@Override
public void start(Listener2 listener) {
this.listener = listener;
this.resolve();
}
@Override
public void refresh() {
this.resolve();
}
/**
* 返回用于对服务器连接进行身份验证的权限。它必须来自受信任的来源,因为如果权限被篡改,rpc可能会被发送给攻击者,这可能会泄露敏感的用户数据。
* 实现必须在不阻塞的情况下生成它,通常是在线生成,并且必须保持它不变。从同一工厂使用相同参数创建的namesolvers必须返回相同的权限。
* 自:
*
* @return
*/
@Override
public String getServiceAuthority() {
return "ok";
}
@Override
public void shutdown() {
}
/**
* 解析服务器地址
*/
private void resolve() {
List<InetSocketAddress> addresses = addrStore.get(serviceName);
List<EquivalentAddressGroup> equivalentAddressGroup = addresses.stream().map(this::toSocketAddress)
.map(Arrays::asList)
.map(this::addrToEquivalentAddressGroup)
.collect(Collectors.toList());
ResolutionResult resolutionResult = ResolutionResult.newBuilder()
.setAddresses(equivalentAddressGroup)
.build();
// 处理已解析地址和属性的更新
this.listener.onResult(resolutionResult);
}
private SocketAddress toSocketAddress(InetSocketAddress address) {
return new InetSocketAddress(address.getHostName(), address.getPort());
}
private EquivalentAddressGroup addrToEquivalentAddressGroup(List<SocketAddress> addrList) {
return new EquivalentAddressGroup(addrList);
}
}
负载均衡解析器提供者LoadBalanceNameResolverProvider.java
package io.grpc.examples.helloworld2;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
/**
* @Author tanyong
* @Version LoadBalanceNameResolverProvider v1.0.0 2024/4/3 9:56 $$
*/
public class LoadBalanceNameResolverProvider extends NameResolverProvider {
private final List<InetSocketAddress> socketAddressList;
public LoadBalanceNameResolverProvider(List<InetSocketAddress> socketAddressList) {
this.socketAddressList = socketAddressList;
}
@Override
protected boolean isAvailable() {
return true;
}
@Override
protected int priority() {
return 5;
}
@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
return new LoadBalanceNameResolver(socketAddressList, HelloWordConstants.SERVICE_NAME);
}
@Override
public String getDefaultScheme() {
return HelloWordConstants.SCHEME;
}
}
4.3 客户端HelloWordClient.java
package io.grpc.examples.helloworld2;
import io.grpc.*;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Author tanyong
* @Version HellowordClient v1.0.0 2024/4/2 16:33 $$
*/
public class HelloWordClient {
/**
* 客户端对服务Greeter进行同步rpc调用 blockingStub
*/
private final GreeterGrpc.GreeterBlockingStub blockingStub;
public HelloWordClient(Channel channel) {
// 创建一个新的阻塞式Stub,支持对服务的一元和流输出调用
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
public void greet(String name) {
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
// 设置此次RPC调用的响应超时时间为1秒
response = blockingStub.withDeadlineAfter(1, TimeUnit.SECONDS).sayHello(request);
System.out.println(response.getMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
// 服务段连接地址
List<InetSocketAddress> addressList = Arrays.asList(new InetSocketAddress("127.0.0.1", 50052),
new InetSocketAddress("127.0.0.1", 50053));
// 注册 NameResolverProvider
NameResolverRegistry.getDefaultRegistry().register(new LoadBalanceNameResolverProvider(addressList));
// 符合namesolver的有效URI
String target = String.format("%s:///%s", HelloWordConstants.SCHEME, HelloWordConstants.SERVICE_NAME);
// 创建channel
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
.defaultLoadBalancingPolicy("round_robin")
// 使用明文连接到服务器。默认情况下,将使用安全连接机制,如TLS。
// 应仅用于测试或API的使用或交换的数据不敏感的API。
.usePlaintext()
.disableRetry()
.build();
try {
HelloWordClient client = new HelloWordClient(channel);
long current = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
Thread.sleep(5000);
client.greet("测试");
}
System.out.println(System.currentTimeMillis() - current);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭channel
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)