如何基于Netty实现简单的RPC 框架
1. 项目模块与依赖
common 模块依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myRPC</artifactId>
<groupId>com.sgg</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>common</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!--netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!--json依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
</dependency>
<!--lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
rpc-client模块依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myRPC</artifactId>
<groupId>com.sgg</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-client</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.sgg</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
rpc-server 模块依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myRPC</artifactId>
<groupId>com.sgg</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-server</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.sgg</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--spring相关依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies>
</project>
myRPC
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sgg</groupId>
<artifactId>myRPC</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>common</module>
<module>rpc-client</module>
<module>rpc-server</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
</parent>
</project>
2. common 通用模块
2.1 RpcRequest
package com.sgg.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author sz
* @DATE 2022/5/6 21:54
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest {
/**
* 全限定类名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 参数类型
*/
private Class<?>[] parameterTypes;
/**
* 实参
*/
private Object[] paramters;
}
2.2 RpcResponse
package com.sgg.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author sz
* @DATE 2022/5/6 21:54
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse {
//返回状态码
private Integer code;
//返回结果
private String result;
//错误信息
private String error;
}
2.3 User
package com.sgg.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author sz
* @DATE 2022/5/6 21:55
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private Integer id;
private String name;
}
2.4 UserService
package com.sgg.service;
import com.sgg.pojo.User;
public interface UserService {
User getUserById(Integer id);
}
3. rpc-server 服务端模块
3.1 MyServiceRpc
package com.sgg.anno;
import java.lang.annotation.*;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyServiceRpc {
}
3.2 MyServerHandler
package com.sgg.handler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
/**
* @author sz
* @DATE 2022/5/6 22:16
*/
@Component
public class MyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {
private static ApplicationContext app;
private static HashMap<String, Object> cache = new HashMap<>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
app = applicationContext;
//拿到容器中所有标注了@MyServiceRpc 注解的 bean
Map<String, Object> beansWithAnnotation = app.getBeansWithAnnotation(MyServiceRpc.class);
//拿到bean实现的接口的全限定类名
Set<Map.Entry<String, Object>> entries = beansWithAnnotation.entrySet();
entries.stream().forEach(ent->{
Class<?>[] interfaces = ent.getValue().getClass().getInterfaces();
if (null!=interfaces && interfaces.length != 0){
Arrays.stream(interfaces).forEach(inter->{
cache.put(inter.getName(),ent.getValue());
});
}
});
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接 : "+ctx.channel().remoteAddress().toString().substring(1));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String json) throws Exception {
//封装结果
RpcResponse rpcResponse = new RpcResponse();
Object result = null;
try {
//将json字符串转换为RpcRequest 对象
RpcRequest rpcRequest = JSONObject.parseObject(json, RpcRequest.class);
//拿到需要调用的类
String className = rpcRequest.getClassName();
Object bean = cache.get(className);
//需要调用的方法名
String methodName = rpcRequest.getMethodName();
//方法参数类型
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
//方法实参
Object[] paramters = rpcRequest.getParamters();
//反射调用方法
FastClass fastClass = FastClass.create(bean.getClass());
FastMethod fastClassMethod = fastClass.getMethod(methodName, parameterTypes);
result = fastClassMethod.invoke(bean, paramters);
rpcResponse.setCode(200);
rpcResponse.setResult((String) result);
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setCode(400);
rpcResponse.setError(e.getMessage());
}
//将结果用json字符串写回去
channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
}
}
3.3 ServerProvider
package com.sgg.provider;
import com.sgg.handler.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;
import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;
/**
* @author sz
* @DATE 2022/5/6 22:07
*/
@Service
public class ServerProvider implements Closeable {
private NioEventLoopGroup boss ;
private NioEventLoopGroup work ;
public void start(String ip,Integer port) {
//创建两个线程组
boss = new NioEventLoopGroup(1);
//默认线程数 = CPU数 * 2
work = new NioEventLoopGroup();
//创建启动组手
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//解析字符串
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//内容处理
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
System.out.println(">>>>>>>服务器启动成功<<<<<<<<");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
if (null!=boss){
boss.shutdownGracefully();
}
if (null!=boss){
work.shutdownGracefully();
}
}
}
@Override
public void close() throws IOException {
System.out.println("容器关闭我被调用了");
if (null!=boss){
boss.shutdownGracefully();
}
if (null!=boss){
work.shutdownGracefully();
}
}
}
3.4 UserServiceImpl
package com.sgg.service.impl;
import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.User;
import com.sgg.service.UserService;
import org.springframework.stereotype.Service;
import java.util.HashMap;
/**
* @author sz
* @DATE 2022/5/6 22:18
*/
@MyServiceRpc
@Service
public class UserServiceImpl implements UserService {
private static HashMap<Integer,User> map = new HashMap();
static {
map.put(1,new User(1,"张三"));
map.put(2,new User(2,"李四"));
}
@Override
public User getUserById(Integer id) {
return map.get(id);
}
}
3.5 ServerApp
package com.sgg;
import com.sgg.provider.ServerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author sz
* @DATE 2022/5/6 22:04
*/
@SpringBootApplication
public class ServerApp implements CommandLineRunner {
@Autowired
private ServerProvider serverProvider;
public static void main(String[] args) {
SpringApplication.run(ServerApp.class,args);
}
@Override
public void run(String... args) throws Exception {
new Thread(()->{
serverProvider.start("127.0.0.1",9999);
}).start();
}
}
4. rpc-client 客户端模块
4.1 RpcClient
package com.sgg.client;
import com.sgg.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author sz
* @DATE 2022/5/6 22:54
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcClient {
private String ip;
private Integer port;
public RpcClient(String ip, Integer port) {
this.ip = ip;
this.port = port;
init();
}
private NioEventLoopGroup eventLoopGroup;
private Channel channel;
private MyClientHandler myClientHandler = new MyClientHandler();
private ExecutorService executorService = Executors.newCachedThreadPool();
public Object sendMess(String message) throws ExecutionException, InterruptedException {
myClientHandler.setRequestMsg(message);
Future submit = executorService.submit(myClientHandler);
return submit.get();
}
public void init() {
//创建线程组
eventLoopGroup = new NioEventLoopGroup();
//创建启动组手
Bootstrap bootstrap = new Bootstrap();
//分组
try {
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//业务
pipeline.addLast(myClientHandler);
}
});
channel = bootstrap.connect(ip, port).sync().channel();
} catch (Exception e) {
e.printStackTrace();
if (null != channel) {
channel.close();
}
if (null != eventLoopGroup) {
eventLoopGroup.shutdownGracefully();
}
}
}
public void close() {
if (null != channel) {
channel.close();
}
if (null != eventLoopGroup) {
eventLoopGroup.shutdownGracefully();
}
}
}
4.2 MyClientHandler
package com.sgg.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.Callable;
/**
* @author sz
* @DATE 2022/5/6 23:04
*/
public class MyClientHandler extends SimpleChannelInboundHandler<String> implements Callable {
private String requestMsg;
private String responseMsg;
private ChannelHandlerContext context;
public void setRequestMsg(String str) {
this.requestMsg = str;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception {
this.responseMsg = str;
//唤醒
notify();
}
@Override
public synchronized Object call() throws Exception {
this.context.writeAndFlush(requestMsg);
//线程等待 拿到响应数据
wait();
return responseMsg;
}
}
4.3 RpcProxy
package com.sgg.proxy;
import com.alibaba.fastjson.JSON;
import com.sgg.client.RpcClient;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import com.sgg.pojo.User;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author sz
* @DATE 2022/5/6 22:46
*/
public class RpcProxy {
public static Object createProxy(Class target) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{target},
(Object proxy, Method method, Object[] args) -> {
RpcRequest rpcRequest = new RpcRequest();
//设置类名
rpcRequest.setClassName(method.getDeclaringClass().getName());
//设置方法名
rpcRequest.setMethodName(method.getName());
//设置方法参数类型
rpcRequest.setParameterTypes(method.getParameterTypes());
//设置方法实际参数
rpcRequest.setParamters(args);
//发送信息,拿到返回值
RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
String mess = (String) rpcClient.sendMess(JSON.toJSONString(rpcRequest));
//转换为rpcResponse
RpcResponse rpcResponse = JSON.parseObject(mess, RpcResponse.class);
//拿到返回结果
if (200==rpcResponse.getCode()){
return JSON.parseObject(rpcResponse.getResult(), User.class);
}
return null;
}
);
}
}
4.4 ClientApp
package com.sgg;
import com.sgg.pojo.User;
import com.sgg.proxy.RpcProxy;
import com.sgg.service.UserService;
/**
* @author sz
* @DATE 2022/5/6 22:44
*/
public class ClientApp {
public static void main(String[] args) {
UserService proxy = (UserService) RpcProxy.createProxy(UserService.class);
User userById = proxy.getUserById(2);
System.out.println("userById = " + userById);
}
}