LangChain脚本如何调度及提效?

简介: 本文介绍了通过任务调度系统SchedulerX管理LangChain脚本的方法。LangChain是开源的大模型开发框架,支持快速构建AI应用,而SchedulerX可托管AI任务,提供脚本版本管理、定时调度、资源优化等功能。文章重点讲解了脚本管理和调度、Prompt管理、资源利用率提升、限流控制、失败重试、依赖编排及企业级可观测性等内容。同时展望了AI任务调度的未来需求,如模型Failover、Tokens限流等,并提供了相关参考链接。

概述

LangChain[1]是开源领域最流行的大模型编程开发框架,支持通过python/js语言快速构建AI应用。Dify[2]是开源的图形化大模型应用开发平台,可以通过可视化的画布拖拖拽拽快速构建AI agent/工作流。


通过任务调度系统托管AI任务,可以进行脚本版本管理、定时调度、提升资源利用率、限流控制、可运维、可观测。

由于篇幅有限,本文章主要介绍通过任务调度SchedulerX[3]进行LangChain脚本的管理和调度,Dify工作流调度将在下一篇介绍。


脚本管理及调度

AI任务有许多业务场景,需要定时调度,比如:

  • 风险监控:每分钟扫描风险数据,通过大模型分析是否有风险事件,并发出报警。
  • 数据分析:每天拉取金融数据,通过大模型进行数据分析,给出投资者建议。
  • 内容生成:每天帮我做工作总结,写日报。

LangChain任务基本上都是python脚本,可以使用SchedulerX的脚本任务[4]托管脚本,并进行定时配置

任务调度SchedulerX还支持脚本的历史版本,方便进行历史版本的对比和回滚:


Prompt管理

Prompt(提示词)对于AI任务来说非常重要,为了得到好的效果,可能需要经常修改Prompt,将Prompt写在脚本中会非常麻烦。我们可以通过SchedulerX的任务参数来管理Prompt,在LangChain脚本中通过SchedulerX提供的系统参数(#{schedulerx.jobParameters})动态获取任务参数,来代替Prompt或者PromptTemplate参数。


定时调度获取Prompt

Prompt写法

1. 通过SchedulerX控制台编写脚本

from langchain_community.llms import Tongyi
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

llm = Tongyi(model="qwen-plus")
question = "#{schedulerx.jobParameters}"
print("question:" + question)

results = llm.invoke(question)
print(results)

2. 配置任务参数

PromptTemplate写法

1. 通过SchedulerX控制台编写脚本

from langchain_community.llms import Tongyi
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

llm = Tongyi(model="qwen-plus")
prompt = PromptTemplate(template="请帮我解答这个问题:{question}")

chain = LLMChain(llm=llm, prompt=prompt)
question = "#{schedulerx.jobParameters}"
print("question:" + question)

results = chain.invoke(question)
print(results)

2. 配置任务参数


API调度动态传递Prompt

SchedulerX也支持通过控制台手动运行或者API调度,动态设置新的Prompt,以上面PromptTemplate写法为例,通过控制台手动运行任务,动态传递任务参数,该任务参数会覆盖任务配置中的静态任务参数。

提升资源利用率

SchedulerX执行脚本,当前支持两种模式(未来会支持更多的运行时):

  • 脚本任务:在ECS上部署schedulerx-agent,每次执行fork一个子进程执行脚本,适合任务数比较多、调度频繁、资源消耗少的场景。
  • K8s任务[5]:在K8s上部署schedulerx-agent,每次执行弹一个Pod执行脚本,适合任务数不多、调度不频繁、资源消耗大的场景。

两种运行时适合不同的场景,结合起来使用,可以提升资源利用率。

如上图所示,通过ECS执行脚本以及通过K8s执行脚本,主要区别总结下表:

限流控制

业务场景:比如有一堆离线任务,每天0点之后执行,处理上一天的数据,核心任务必须在早上9点上班前全部跑完。业务同学可能会把任务的调度时间都设置成同一时刻,比如每天00:30执行。


当大量任务同时调度的时候,会把ECS资源打满。虽然用K8s跑脚本可以解决一部分问题,但是突增的流量一样会把下游(比如数据库)打满。所以针对这种突增流量的场景,最佳解决方案是使用限流。通过限流控制解决定时调度不均特别是突发流量的场景,其实也是一种提升资源利用率的解决方案。

如上图所示,任务调度SchedulerX支持应用级别的限流控制:

1. 每个应用会有2个队列,一个是优先级排队队列,可以把任务按照优先级在队列中排队,保证核心任务优先跑完。任务的优先级仅在自己的应用下生效,不会和其他应用产生冲突。


2. 另一个是并发数队列,控制这个应用的并发数,不同应用的并发数彼此不受干扰。


3. 当并发队列中某个任务运行完成,有空闲槽位后,会从排队队列头部取出任务,放到并发队列中,开始执行任务。


失败自动重试

当前大模型调用不是很稳定,大家平时和大模型聊天,可能会经常遇到token限流了,或者是后端服务异常了。这个时候我们只要过一会重新尝试下就好了。


任务调度SchedulerX自带任务失败自动重试功能,可以通过控制台动态配置,经过我们验证,使用失败重试功能,LangChain脚本因为后端大模型限流或者服务不可用导致的失败率大大降低,成功率可以提升至少一个9。


依赖编排

SchedulerX提供可视化任务编排能力,如果你的LangChain脚本有依赖关系,可以进行任务编排。甚至是不同任务类型的任务,都可以进行编排。

如上图所示:

1. 先通过Shell脚本,去大数据平台拉取数据。

2. 通过Java代码实现,做商家数据和用户数据清洗。

3. 通过LangChain实现,把清洗好的数据用大模型做数据分析。

4. 最后,再通过Python脚本生成报表。


企业级可观测

任务调度SchedulerX默认集成了各种云产品,提供企业级可观测能力,包括但不限于如下功能。


调度大盘

调度大盘可以看到任务执行的总体情况,支持按照命名空间和应用过滤筛选。


监控报警

任务如果执行失败了,需要快速响应处理,否则容易产生故障。SchedulerX支持应用级别报警,也能精细到每个任务级别,如下图所示是任务级别报警配置。

  • 联系人管理:支持联系人和联系人组管理,支持同步云监控联系人。
  • 报警方式:失败报警、超时报警、成功通知。
  • 报警渠道:邮件、webhook、短信、电话。


日志服务

当任务执行失败了,需要查看任务运行的日志分析问题。只要接入schedulerx-agent运行脚本,默认就集成了日志服务,可以看到脚本运行的所有标准输出和异常。

1. 配置如下脚本

2. 任务参数配置如下

3. 运行一次,查看日志

未来展望

在AI时代,AI任务调度面临着新的机遇和挑战,我们总结了一些常见的需求如下:

  • AI任务管理:可以通过任务调度配置prompt模版、模型类型、输出格式等参数,通过控制台可以动态调整。

  • 模型Failover:通过任务调度系统托管各种模型,如果某个模型调用失败,可以自动重试其他的模型,进一步提升任务执行的成功率。
  • Tokens限流:每个任务返回消耗的tokens,任务调度系统能做到token级别的限流,防止触发下游大模型的API限流。
  • AI任务批处理:AI任务执行时间比较长,特别是推理型模型时间更加长,通过任务调度系统进行任务拆分及分布式处理,加快任务执行速度。
  • AI可观测:可以看到每个任务的执行耗时、消耗的tokens、输入和输出。如果是工作流,可以看到每个node级别的耗时、tokens消耗、输入和输出。

如果您有AI任务调度方面的其他诉求,欢迎联系我们,钉钉群号:23103656

参考链接:

[1]https://python.langchain.com/docs/introduction/

[2]https://dify.ai/zh

[3]https://www.aliyun.com/ntms/middleware/schedulerx

[4]https://help.aliyun.com/zh/mse/user-guide/script-task

[5]https://help.aliyun.com/zh/mse/user-guide/k8s-task


来源  |  阿里云开发者公众号

作者  |  学仁

相关实践学习
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
相关文章
|
监控 Kubernetes Go
日志采集效能跃迁:iLogtail 到 LoongCollector 的全面升级
LoongCollector 在日志场景中实现了全面的重磅升级,从功能、性能、稳定性等各个方面均进行了深度优化和提升,本文我们将对 LoongCollector 的升级进行详细介绍。
824 87
|
存储 人工智能 安全
MCP 规范新版本特性全景解析与落地实践
MCP Specification 在 2025-03-26 发布了最新的版本,本文对主要的改动进行详细介绍和解释
3692 145
|
机器学习/深度学习 设计模式 人工智能
深度解析Agent实现,定制自己的Manus
文章结合了理论分析与实践案例,旨在帮助读者系统地认识AI Agent的核心要素、设计模式以及未来发展方向。
2913 103
深度解析Agent实现,定制自己的Manus
|
消息中间件 运维 监控
加一个JVM参数,让系统可用率从95%提高到99.995%
本文针对一个高并发(10W+ QPS)、低延迟(毫秒级返回)的系统因内存索引切换导致的不稳定问题,深入分析并优化了JVM参数配置。通过定位问题根源为GC压力大,尝试了多种优化手段:调整MaxTenuringThreshold、InitialTenuringThreshold、AlwaysTenure等参数让索引尽早晋升到老年代;探索PretenureSizeThreshold和G1HeapRegionSize实现索引直接分配到老年代;加速索引复制过程以及升级至JDK11使用ZGC。
807 82
加一个JVM参数,让系统可用率从95%提高到99.995%
|
机器学习/深度学习 存储 人工智能
浅入浅出——生成式 AI
团队做 AI 助理,而我之前除了使用一些 AI 类产品,并没有大模型相关的积累。故先补齐一些基本概念,避免和团队同学沟通起来一头雾水。这篇文章是学习李宏毅老师《生成式 AI 导论》的学习笔记。
1054 27
浅入浅出——生成式 AI
|
开发框架 人工智能 Java
破茧成蝶:阿里云应用服务器让传统 J2EE 应用无缝升级 AI 原生时代
本文详细介绍了阿里云应用服务器如何助力传统J2EE应用实现智能化升级。文章分为三部分:第一部分阐述了传统J2EE应用在智能化转型中的痛点,如协议鸿沟、资源冲突和观测失明;第二部分展示了阿里云应用服务器的解决方案,包括兼容传统EJB容器与微服务架构、支持大模型即插即用及全景可观测性;第三部分则通过具体步骤说明如何基于EDAS开启J2EE应用的智能化进程,确保十年代码无需重写,轻松实现智能化跃迁。
881 42
|
存储 SQL 大数据
从 o11y 2.0 说起,大数据 Pipeline 的「多快好省」之道
SLS 是阿里云可观测家族的核心产品之一,提供全托管的可观测数据服务。本文以 o11y 2.0 为引子,整理了可观测数据 Pipeline 的演进和一些思考。
618 34
|
人工智能 安全 应用服务中间件
阿里巴巴 MCP 分布式落地实践:快速转换 HSF 到 MCP server
本文分享了阿里巴巴内部将大规模HSF服务快速转换为MCP Server的实践经验,通过Higress网关实现MCP协议卸载,无需修改代码即可接入MCP生态。文章分析了MCP生态面临的挑战,如协议快速迭代和SDK不稳定性,并详细介绍了操作步骤及组件功能。强调MCP虽非终极解决方案,但作为AI业务工程化的起点具有重要意义。最后总结指出,MCP只是AI原生应用发展的第一步,未来还有更多可能性值得探索。
1614 49
|
11月前
|
人工智能 安全 API
Agent 工程师绕不开的必修课:API 网关 vs API 管理
本文探讨了“API管理”与“API网关”的起源、发展及差异,二者分别服务于API生命周期的不同阶段。API网关从流量网关演进至AI网关,承担运行时请求控制;API管理则从接口文档化发展到商业化平台,关注全生命周期治理。两者在实际应用中协同工作,通过分层架构和策略联动实现高效运营。未来,随着大模型应用的兴起,AI网关和MCP Server管理将成为新趋势,推动API技术迈入智能化和服务化的新阶段。
Agent 工程师绕不开的必修课:API 网关 vs API 管理