文档
安装
- 下载源码包
go get github.com/aliyun/aliyun-tablestore-go-sdk/tunnel
-
安装依赖
- 可以在tunnel目录下使用dep安装依赖
dep ensure -v
- 也可以直接使用go get安装依赖包:
go get -u go.uber.org/zap
go get github.com/cenkalti/backoff
go get github.com/golang/protobuf/proto
go get github.com/satori/go.uuid
go get github.com/stretchr/testify/assert
go get github.com/smartystreets/goconvey/convey
go get github.com/golang/mock/gomock
go get gopkg.in/natefinch/lumberjack.v2
快速开始
- 初始化Tunnel client:
// endpoint是表格存储实例endpoint,如https://instance.cn-hangzhou.ots.aliyun.com
// instance为实例名称
// accessKeyId和accessKeySecret分别为访问表格存储服务的AccessKey的Id和Secret
tunnelClient := tunnel.NewTunnelClient(endpoint, instance,
accessKeyId, accessKeySecret)
- 创建新Tunnel:
req := &tunnel.CreateTunnelRequest{
TableName: "testTable",
TunnelName: "testTunnel",
Type: tunnel.TunnelTypeBaseStream, //全量加增量类型Tunnel
}
resp, err := tunnelClient.CreateTunnel(req)
if err != nil {
log.Fatal("create test tunnel failed", err)
}
log.Println("tunnel id is", resp.TunnelId)
- 获取已有Tunnel信息:
req := &tunnel.DescribeTunnelRequest{
TableName: "testTable",
TunnelName: "testTunnel",
}
resp, err := tunnelClient.DescribeTunnel(req)
if err != nil {
log.Fatal("create test tunnel failed", err)
}
log.Println("tunnel id is", resp.Tunnel.TunnelId)
- 注册callback,开始数据消费:
//用户定义消费callback函数
func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
fmt.Println("user-defined information", ctx.CustomValue)
for _, rec := range records {
fmt.Println("tunnel record detail:", rec.String())
}
fmt.Println("a round of records consumption finished")
return nil
}
//配置callback到SimpleProcessFactory,配置消费端TunnelWorkerConfig
workConfig := &tunnel.TunnelWorkerConfig{
ProcessorFactory: &tunnel.SimpleProcessFactory{
CustomValue: "user custom interface{} value",
ProcessFunc: exampleConsumeFunction,
},
}
//使用TunnelDaemon持续消费指定tunnel
daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
log.Fatal(daemon.Run())
- 删除Tunnel
req := &tunnel.DeleteTunnelRequest {
TableName: "testTable",
TunnelName: "testTunnel",
}
_, err := tunnelClient.DeleteTunnel(req)
if err != nil {
log.Fatal("delete test tunnel failed", err)
}
配置项
- tunnel client配置
初始化tunnel client时可以通过NewTunnelClientWithConfig接口自定义客户端配置,使用不指定config初始化接口或者config为nil时会使用DefaultTunnelConfig:
var DefaultTunnelConfig = &TunnelConfig{
//最大指数退避重试时间
MaxRetryElapsedTime: 45 * time.Second,
//HTTP请求超时时间
RequestTimeout: 30 * time.Second,
//http.DefaultTransport
Transport: http.DefaultTransport,
}
- 数据消费worker配置
TunnelWorkerConfig中包含了数据消费worker需要的配置,其中ProcessorFactory为必填项,其余字段不填将使用默认值,通常使用默认值即可:
type TunnelWorkerConfig struct {
//worker同Tunnel服务的心跳超时时间,通常使用默认值即可
HeartbeatTimeout time.Duration
//worker发送心跳的频率,通常使用默认值即可
HeartbeatInterval time.Duration
//tunnel下消费连接建立接口,通常使用默认值即可
ChannelDialer ChannelDialer
//消费连接上具体处理器产生接口,通常使用callback函数初始化SimpleProcessFactory即可
ProcessorFactory ChannelProcessorFactory
//zap日志配置,默认值为DefaultLogConfig
LogConfig *zap.Config
//zap日志轮转配置,默认值为DefaultSyncer
LogWriteSyncer zapcore.WriteSyncer
}
其中的ProcessorFactory为用户注册消费callback函数以及其他信息的接口,建议使用SDK中自带SimpleProcessorFactory实现:
type SimpleProcessFactory struct {
//用户自定义信息,会传递到ProcessFunc和ShutdownFunc中的ChannelContext参数中
CustomValue interface{}
//Worker记录checkpoint的间隔,CpInterval<=0时会使用DefaultCheckpointInterval
CpInterval time.Duration
//worker数据处理的同步调用callback,ProcessFunc返回error时worker会用本批数据退避重试ProcessFunc
ProcessFunc func(channelCtx *ChannelContext, records []*Record) error
//worker退出时的同步调用callback
ShutdownFunc func(channelCtx *ChannelContext)
//日志配置,Logger为nil时会使用DefaultLogConfig初始化logger
Logger *zap.Logger
}
- 日志配置
默认日志配置:
//DefaultLogConfig是TunnelWorkerConfig和SimpleProcessFactory使用的默认日志配置
var DefaultLogConfig = zap.Config{
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
Development: false,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: "json",
EncoderConfig: zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "msg",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
},
}
日志轮转配置:
//DefaultSyncer是TunnelWorkerConfig和SimpleProcessFactory使用的默认日志轮转配置
var DefaultSyncer = zapcore.AddSync(&lumberjack.Logger{
//日志文件路径
Filename: "tunnelClient.log",
//最大日志文件大小
MaxSize: 512, //MB
//压缩轮转的日志文件数
MaxBackups: 5,
//轮转日志文件保留的最大天数
MaxAge: 30, //days
//是否压缩轮转日志文件
Compress: true,
})