日志服务接入方式之log producer library

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
对象存储 OSS,恶意文件检测 1000次 1年
简介: Producer Library解决的问题: 客户端日志不落盘:既数据生产后直接通过网络发往服务端。 客户端高并发写入:例如一秒钟会有百次以上写操作。 客户端计算与IO逻辑分离:打日志不影响计算耗时。

日志服务(原SLS)团队提供LogHub Producer Library方便客户端接入日志,Producer Library和Consumer Library是对LogHub功能的包装,降低数据收集与消费的门槛。

Producer Library解决的问题:

  1. 客户端日志不落盘:既数据产生后直接通过网络发往服务端。
  2. 客户端高并发写入:例如一秒钟会有百次以上写操作。
  3. 客户端计算与IO逻辑分离:打日志不影响计算耗时。

在以上场景中,Producer Library会简化你程序开发的代价,帮助你批量聚合写请求,通过异步的方式发往LogHub服务端。在整个过程中,用户可以配置批量聚合的参数,服务端异常处理的逻辑等。

0c5e22da184eec0f93979cec8ff159394b1143e0

以上各种接入方式的对比:

接入方式 优点/缺点 针对场景
日志落盘+Logtail 日志收集与打日志解耦,无需修改代码 常用场景
Syslog + Logtail 性能较好(80MB/S),日志不落盘,需支持syslog协议 Syslog场景
SDK直发 不落盘,直接发往服务端,需要处理好网络IO与程序IO之间的切换 日志不落盘
Producer Library 不落盘,异步合并发送服务端,吞吐量较好 日志不落盘,客户端QPS高

(目前Producer Library只支持Java 版本,其他语言待开发)

LogHub Producer Library功能

  1. 提供异步的发送接口,线程安全。
  2. 可以添加多个project的配置。
  3. 用于发送的网络IO线程数量可以配置。
  4. merge成的包的日志数量以及大小都可以配置。
  5. 内存使用可控,当内存使用达到用户配置的阈值时,producer的send接口会阻塞,直到有空闲的内存可用。

使用方法

producer使用分为以下几个步骤:

1: maven工程中添加依赖:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>log-loghub-producer</artifactId>
    <version>0.1.4</version>
</dependency>

2:程序中配置ProducerConfig,其中各个参数说明如下。

public class ProducerConfig 
{
    //被缓存起来的日志的发送超时时间,如果缓存超时,则会被立即发送,单位是毫秒
    public int packageTimeoutInMS = 3000;
    //每个缓存的日志包中包含日志数量的最大值,不能超过4096
    public int logsCountPerPackage = 4096;
    //每个缓存的日志包的大小的上限,不能超过5MB,单位是字节
    public int logsBytesPerPackage = 5 * 1024 * 1024;
    //单个producer实例可以使用的内存的上限,单位是字节
    public int memPoolSizeInByte = 1000 * 1024 * 1024;
    //IO线程池最大线程数量,主要用于发送数据到日志服务
    public int maxIOThreadSizeInPool = 50;
    //当使用指定shardhash的方式发送日志时,这个参数需要被设置,否则不需要关心。后端merge线程会将映射到同一个shard的数据merge在一起,而shard关联的是一个hash区间,
    //producer在处理时会将用户传入的hash映射成shard关联hash区间的最小值。每一个shard关联的hash区间,producer会定时从loghub拉取,该参数的含义是每隔shardHashUpdateIntervalInMS毫秒,
    //更新一次shard的hash区间。
    public int shardHashUpdateIntervalInMS = 10 * 60 * 1000;
    //如果发送失败,重试的次数,如果超过该值,就会将异常作为callback的参数,交由用户处理。
    public int retryTimes = 3;
}

3:继承ILogCallback,callback主要用于日志发送结果的处理,结果包括发送成功和发生异常。用户也可以选择不处理,这样就不需要继承ILogCallback。

4:创建producer实例,调用send接口发数据。

下面是一个完整的示例。

示例

main:

public class ProducerSample {
    private final static int ThreadsCount = 25;

    public static String RandomString(int length) {
        String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        Random random = new Random();
        StringBuffer buf = new StringBuffer();
        for (int i = 0; i < length; i++) {
            int num = random.nextInt(62);
            buf.append(str.charAt(num));
        }
        return buf.toString();
    }

    public static void main(String args[]) throws InterruptedException {
        ProducerConfig producerConfig = new ProducerConfig();
        // 使用默认producer配置
        final LogProducer producer = new LogProducer(producerConfig);
        // 添加多个project配置
        producer.setProjectConfig(new ProjectConfig("your project 1",
                "endpoint", "your accesskey id", "your accesskey"));
        producer.setProjectConfig(new ProjectConfig("your project 2",
                "endpoint", "your accesskey id", "your accesskey",
                "your sts token"));
        // 更新project 1的配置
        producer.setProjectConfig(new ProjectConfig("your project 1",
                "endpoint", "your new accesskey id", "your new accesskey"));
        // 删除project 2的配置
        producer.removeProjectConfig("your project 2");
        // 生成日志集合,用于测试
        final Vector<Vector<LogItem>> logGroups = new Vector<Vector<LogItem>>();
        for (int i = 0; i < 100000; ++i) {
            Vector<LogItem> tmpLogGroup = new Vector<LogItem>();
            LogItem logItem = new LogItem((int) (new Date().getTime() / 1000));
            logItem.PushBack("level", "info" + System.currentTimeMillis());
            logItem.PushBack("message", "test producer send perf "
                    + RandomString(50));
            logItem.PushBack("method", "SenderToServer " + RandomString(10));
            tmpLogGroup.add(logItem);
            logGroups.add(tmpLogGroup);
        }
        // 并发调用send发送日志
        Thread[] threads = new Thread[ThreadsCount];
        for (int i = 0; i < ThreadsCount; ++i) {
            threads[i] = new Thread(null, new Runnable() {
                Random random = new Random();

                public void run() {
                    int j = 0, rand = random.nextInt(99999);
                    while (++j < Integer.MAX_VALUE) {
                        producer.send("project 1", "logstore 1", "topic",
                                "source ip", logGroups.get(rand),
                                new CallbackSample("project 1", "logstore 1", "topic", "source ip", null, logGroups.get(rand), producer));
                    }
                }
            }, i + "");
            threads[i].start();
        }
        //等待发送线程退出
        Thread.sleep(1 * 60 * 60 * 1000);
        //主动刷新缓存起来的还没有被发送的日志
        producer.flush();
        //关闭后台io线程,close会将调用时刻内存中缓存的数据发送出去
        producer.close();
    }
}

callback:

public class CallbackSample extends ILogCallback {
    //保存要发送的数据,当时发生异常时,进行重试
    public String project;
    public String logstore;
    public String topic;
    public String shardHash;
    public String source;
    public Vector<LogItem> items;
    public LogProducer producer;
    public int retryTimes = 0;
    public CallbackSample(String project, String logstore, String topic,
            String shardHash, String source, Vector<LogItem> items, LogProducer producer) {
        super();
        this.project = project;
        this.logstore = logstore;
        this.topic = topic;
        this.shardHash = shardHash;
        this.source = source;
        this.items = items;
        this.producer = producer;
    }

    public void onCompletion(PutLogsResponse response, LogException e) {
        if (e != null) {
            // 打印异常
            System.out.println(e.GetErrorCode() + ", " + e.GetErrorMessage() + ", " + e.GetRequestId());
            //最多重试三次
            if(retryTimes++ < 3)
            {
                producer.send(project, logstore, topic, source, shardHash, items, this);
            }
        }
        else{
            System.out.println("send success, request id: " + response.GetRequestId());
        }
    }

}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
2月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
496 30
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
26天前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
|
3月前
|
XML JSON Java
Logback 与 log4j2 性能对比:谁才是日志框架的性能王者?
【10月更文挑战第5天】在Java开发中,日志框架是不可或缺的工具,它们帮助我们记录系统运行时的信息、警告和错误,对于开发人员来说至关重要。在众多日志框架中,Logback和log4j2以其卓越的性能和丰富的功能脱颖而出,成为开发者们的首选。本文将深入探讨Logback与log4j2在性能方面的对比,通过详细的分析和实例,帮助大家理解两者之间的性能差异,以便在实际项目中做出更明智的选择。
368 3
|
4天前
|
SQL 关系型数据库 MySQL
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
MySQL事务日志-Undo Log工作原理分析
|
1月前
|
存储 监控 安全
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
|
3月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1754 14
MySQL事务日志-Redo Log工作原理分析
|
2月前
|
存储 监控 安全
什么是日志管理,如何进行日志管理?
日志管理是对IT系统生成的日志数据进行收集、存储、分析和处理的实践,对维护系统健康、确保安全及获取运营智能至关重要。本文介绍了日志管理的基本概念、常见挑战、工具的主要功能及选择解决方案的方法,强调了定义管理目标、日志收集与分析、警报和报告、持续改进等关键步骤,以及如何应对数据量大、安全问题、警报疲劳等挑战,最终实现日志数据的有效管理和利用。
156 0
|
3月前
|
Python
log日志学习
【10月更文挑战第9天】 python处理log打印模块log的使用和介绍
50 0
|
3月前
|
数据可视化
Tensorboard可视化学习笔记(一):如何可视化通过网页查看log日志
关于如何使用TensorBoard进行数据可视化的教程,包括TensorBoard的安装、配置环境变量、将数据写入TensorBoard、启动TensorBoard以及如何通过网页查看日志文件。
308 0
|
3月前
|
存储 分布式计算 NoSQL
大数据-136 - ClickHouse 集群 表引擎详解1 - 日志、Log、Memory、Merge
大数据-136 - ClickHouse 集群 表引擎详解1 - 日志、Log、Memory、Merge
79 0

相关产品

  • 日志服务