Netty手写RPC框架

简介: 创建Request类,继承Message,klass是调用的Class目标,name,parameterType,argument分别是方法名称,参数类型,参数

首发于Enaium的个人博客


协议就用上篇文章的协议

public class Message implements Serializable {
   
    private final long order;

    public Message(long order) {
   
        this.order = order;
    }

    public long getOrder() {
   
        return order;
    }
}

只不过Message加了个Order熟悉,

创建Request类,继承Message,klass是调用的Class目标,name,parameterType,argument分别是方法名称,参数类型,参数

public class Request extends Message {
   
    private final String klass;
    private final String name;
    private final Class<?>[] parameterType;
    private final Object[] argument;

    public Request(long order, String klass, String name, Class<?>[] parameterType, Object[] argument) {
   
        super(order);
        this.klass = klass;
        this.name = name;
        this.parameterType = parameterType;
        this.argument = argument;
    }

    public String getKlass() {
   
        return klass;
    }

    public String getName() {
   
        return name;
    }

    public Class<?>[] getParameterType() {
   
        return parameterType;
    }

    public Object[] getArgument() {
   
        return argument;
    }
}

创建Response类继承Message,result调用的结果,throwable调用的异常

public class Response extends Message {
   
    private final Object result;
    private final Throwable throwable;

    public Response(long order, Object result, Throwable throwable) {
   
        super(order);
        this.result = result;
        this.throwable = throwable;
    }

    public Object getResult() {
   
        return result;
    }

    public Throwable getThrowable() {
   
        return throwable;
    }
}

创建一个PRCHandler类,来处理请求,用反射调用即可

public class PRCHandler extends SimpleChannelInboundHandler<Request> {
   
    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Request request) {
   
        try {
   
            Class<?> aClass = Class.forName(request.getKlass());
            Object o = aClass.getConstructor().newInstance();
            Object invoke = aClass.getMethod(request.getName(), request.getParameterType()).invoke(o, request.getArgument());
            channelHandlerContext.channel().writeAndFlush(new Response(request.getOrder(), invoke, null));
        } catch (ClassNotFoundException | InvocationTargetException | InstantiationException | IllegalAccessException | NoSuchMethodException e) {
   
            e.printStackTrace();
            channelHandlerContext.channel().writeAndFlush(new Response(request.getOrder(), null, e.getCause()));
        }
    }
}

接着启动服务器,服务器就这样写好了

public class RPCServer {
   

    private static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
    private static final MessageCodec MESSAGE_CODEC = new MessageCodec();
    private static final PRCHandler PRC_HANDLER = new PRCHandler();

    public static void main(String[] args) {
   
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
   
            Channel localhost = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
   
                @Override
                public void initChannel(SocketChannel channel) {
   
                    channel.pipeline().addLast(LOGGING_HANDLER);
                    channel.pipeline().addLast(MESSAGE_CODEC);
                    channel.pipeline().addLast(PRC_HANDLER);
                }
            }).bind("localhost", 3828).sync().channel();
            System.out.println("Runnable...");
            localhost.closeFuture().sync();
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        } finally {
   
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

现在在test里测试一下,写好客户端连接,Hanlder先不用太关注

public class Main {
   

    private static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
    private static final MessageCodec MESSAGE_CODEC = new MessageCodec();
    private static final Handler HANDLER = new Handler();

    private static Channel channel;

    public static void main(String[] args) {
   
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        try {
   
            channel = new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
   
                @Override
                public void initChannel(SocketChannel channel) {
   
                    channel.pipeline().addLast(LOGGING_HANDLER);
                    channel.pipeline().addLast(MESSAGE_CODEC);
                    channel.pipeline().addLast(HANDLER);
                }
            }).connect("localhost", 3828).sync().channel();
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        }

        Runtime.getRuntime().addShutdownHook(new Thread(nioEventLoopGroup::shutdownGracefully));
    }
}

创建一个Call注解,klass是目标类,name是目标类的方法

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Call {
   
    String klass();

    String name();
}

现在创建一个目标类

public class Target {
   
    public String render() {
   
        return "RENDER HELLO WORLD!";
    }
}

创建个一个Service接口

public interface Service {
   
    @Call(klass = "cn.enaium.Target", name = "render")
    String render();
}

接着使用动态代理

@SuppressWarnings("unchecked")
private static <T> T getService(Class<T> klass) {
   

}

没有Call注解的返回null

Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{
   klass}, (proxy, method, args) -> {
   
    if (!method.isAnnotationPresent(Call.class)) {
   
        return null;
    }
}

使用Promise来获取结果

Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{
   klass}, (proxy, method, args) -> {
   
    if (!method.isAnnotationPresent(Call.class)) {
   
        return null;
    }
    Promise<Object> promise = new DefaultPromise<>(channel.eventLoop());
    Call annotation = method.getAnnotation(Call.class);
    long increment = Util.increment();
    channel.writeAndFlush(new Request(increment, annotation.klass(), annotation.name(), method.getParameterTypes(), args));
    Main.HANDLER.getPromiseMap().put(increment, promise);
    promise.await();
    if (promise.cause() != null) {
   
        return new RuntimeException(promise.cause());
    }
    return promise.getNow();
});
@SuppressWarnings("unchecked")
private static <T> T getService(Class<T> klass) {
   
    Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{
   klass}, (proxy, method, args) -> {
   
        if (!method.isAnnotationPresent(Call.class)) {
   
            return null;
        }
        Promise<Object> promise = new DefaultPromise<>(channel.eventLoop());
        Call annotation = method.getAnnotation(Call.class);
        long increment = Util.increment();
        channel.writeAndFlush(new Request(increment, annotation.klass(), annotation.name(), method.getParameterTypes(), args));
        Main.HANDLER.getPromiseMap().put(increment, promise);
        promise.await();
        if (promise.cause() != null) {
   
            return new RuntimeException(promise.cause());
        }
        return promise.getNow();
    });
    return (T) o;
}

序号自增

public class Util {
   
    private static final AtomicLong atomicLong = new AtomicLong();

    public static long increment() {
   
        return atomicLong.incrementAndGet();
    }
}

Handler来处理响应,根据请求的order获取返回值

public class Handler extends SimpleChannelInboundHandler<Response> {
   

    private final Map<Long, Promise<Object>> promiseMap = new HashMap<>();

    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
   

        if (null == promiseMap.get(response.getOrder())) {
   
            return;
        }

        Promise<Object> promise = promiseMap.remove(response.getOrder());

        if (response.getResult() != null) {
   
            promise.setSuccess(response.getResult());
        } else {
   
            promise.setFailure(response.getThrowable());
        }
    }

    public Map<Long, Promise<Object>> getPromiseMap() {
   
        return promiseMap;
    }
}

现在来运行服务器和客户端测试一下

源码

视频

目录
相关文章
|
5月前
|
NoSQL 前端开发 Java
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
106 0
|
4月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
2月前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
3月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
91 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
5月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
4月前
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性
|
4月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
5月前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连
|
7月前
|
存储 缓存 Linux
【实战指南】嵌入式RPC框架设计实践:六大核心类构建高效RPC框架
在先前的文章基础上,本文讨论如何通过分层封装提升一个针对嵌入式Linux的RPC框架的易用性。设计包括自动服务注册、高性能通信、泛型序列化和简洁API。框架分为6个关键类:BindingHub、SharedRingBuffer、Parcel、Binder、IBinder和BindInterface。BindingHub负责服务注册,SharedRingBuffer实现高效数据传输,Parcel处理序列化,而Binder和IBinder分别用于服务端和客户端交互。BindInterface提供简单的初始化接口,简化应用集成。测试案例展示了客户端和服务端的交互,验证了RPC功能的有效性。
474 9
|
5月前
|
编解码 NoSQL Redis
(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。
121 3