【DSW Gallery】特征平台

简介: 特征平台是专门用来存储,共享,管理机器学习模型特征的存储库。特征平台可以方便的向多人、多团队共享特征,提供安全,高效且统一的存储,保证离线在线的一致性。

直接使用

请打开基于特征平台,并点击右上角 “ 在DSW中打开” 。

image.png


pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-0.1.6-py3-none-any.whl


import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import Schema, Field
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

数据集介绍

我们使用的是开源电影数据集,数据集官网:http://moviedata.csuldw.com

其中主要使用的是 movie 数据,user 数据,rating 数据。这三份数据可以对应推荐流程中的物料表,用户表,label 表。

我们将展示如果使用 feature store 方便的将三份数据的特征整合在一起离线训练模型,并且完成后续上线服务。

Project

我们可以通过 project 可以创建多个项目空间,每个项目空间是独立的。project 里会配置基本的信息,每个 project 会对应一个 offlinestore 和 onlinestore。

运行 notebook 需要 feature store server 端配合运行,购买完 pairec 配置中心实例后,在配置中心可以看到服务接口地址 (host) 和 token.

host = ""
token = ""
fs = FeatureStoreClient(host, token)
cur_project_name = "fs_movie_7"
offline_datasource_id = 38
online_datasource_id = 1
project = fs.get_project(cur_project_name)
if project is None:
  project = fs.create_project(cur_project_name, offline_datasource_id, online_datasource_id)

获取对应的 project

project = fs.get_project(cur_project_name)

打印该project的信息

project.print_summary()

FeatureEntity

FeatureEntity 描述了一组相关的特征集合。多个 FeatureView 可以关联一个 FeatureEntity。 每个Entity 都会有一个 Entity JoinId , 通过 JoinId 可以关联多个 FeatureView 特征。每一个 FeatureView 都有一个主键(索引键)来获取其下面的特征数据,但是这里的索引键可以和 JoinId 定义的名称不一样。 这里我们创建 movie, user, rating 三个 Entity。

cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_movie)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
print("entity_id = ", entity_id)

获取对应的entity

获取对应的entity

打印该entity的信息

feature_entity.print_summary()
cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_id = None
entity_id = project.get_entity(cur_entity_name_user)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_user, join_id=join_id)
print("entity_id = ", entity_id)
cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_ratings)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
print("entity_id = ", entity_id)

FeatureView

FeatureStore是一个特征管理平台,当外部的数据进入到 FS 中, 需要通过 FeatureView。 FeatureView 指定了数据从哪里来(DataSource), 数据进入FS 需要哪些转换(特征工程/Transformation), 特征 schema (特征名称+类型),数据需要放到哪里(OnlineStore/OfflineStore)、特征meta(主键、事件时间、分区键, FeatureEntity, ttl )。

FeatureView 会分为两种类型, BatchFeatureView 和 StreamFeatureView 。 BatchFeatureView 可以把离线数据注入到 FS 中, StreamFeatureView 支持实时特征的写入。 BatchFeatureView 会把数据管理到 OfflineStore 里, 然后可以选择同步到 OnlineStore 里。StreamFeatureView 会把数据写入到 OnlineStore 里,然后同步到 OfflineStore 里, 但实际上我们会把同样的数据同时写入到里面。

BatchFeatureView

DataSource 中的特征数据写入 FS 中,有两种情况。

1. 数据直接写入

UrlDataSource 写入到 maxcompute 的 offlinestore, 那么定义的 FeatureView 的 schema 需要手动创建。

path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)

schema 定义了字段的名称和类型。

movie_schema = Schema(
    Field(name='movie_id', type=FSTYPE.STRING),
    Field(name='name', type=FSTYPE.STRING),
    Field(name='alias', type=FSTYPE.STRING),
    Field(name='actores', type=FSTYPE.STRING),
    Field(name='cover', type=FSTYPE.STRING),
    Field(name='directors', type=FSTYPE.STRING),
    Field(name='double_score', type=FSTYPE.STRING),
    Field(name='double_votes', type=FSTYPE.STRING),
    Field(name='genres', type=FSTYPE.STRING),
    Field(name='imdb_id', type=FSTYPE.STRING),
    Field(name='languages', type=FSTYPE.STRING),
    Field(name='mins', type=FSTYPE.STRING),
    Field(name='official_site', type=FSTYPE.STRING),
    Field(name='regions', type=FSTYPE.STRING),
    Field(name='release_data', type=FSTYPE.STRING),
    Field(name='slug', type=FSTYPE.STRING),
    Field(name='story', type=FSTYPE.STRING),
    Field(name='tags', type=FSTYPE.STRING),
    Field(name='year', type=FSTYPE.STRING),
    Field(name='actor_ids', type=FSTYPE.STRING),
    Field(name='director_ids', type=FSTYPE.STRING),
    Field(name='dt', type=FSTYPE.STRING)
)
print(movie_schema)

新建 batch_feature_view

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, owner='yancheng', schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()

数据写入 mc 表

cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()

查看当前 task 的信息

print(cur_task.task_summary)

数据同步到 onlinestore 中

cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()
print(cur_task.task_summary)

获取对应的FeatureView

batch_feature_view = project.get_feature_view(feature_view_movie_name)

打印该FeatureView的信息

batch_feature_view.print_summary()

我们按此顺序,依次导入 users 表, ratings 表。

users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)
user_schema = Schema(
  Field(name='user_md5', type=FSTYPE.STRING),
  Field(name='user_nickname', type=FSTYPE.STRING),
  Field(name='ds', type=FSTYPE.STRING)
)
print(user_schema)
feature_view_user_name = "feature_view_users_1"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, owner='yancheng', schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])
write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)
ratings_schema = Schema(
  Field(name='rating_id', type=FSTYPE.STRING),
  Field(name='user_md5', type=FSTYPE.STRING),
  Field(name='movie_id', type=FSTYPE.STRING),
  Field(name='rating', type=FSTYPE.STRING),
  Field(name='rating_time', type=FSTYPE.STRING),
  Field(name='dt', type=FSTYPE.STRING)
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, owner='yancheng', schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()

Offlinestore

离线特征数据存储的数据仓库,在我们系统中是 MaxCompute 或者是 DS 上的 HDFS,但是通过 spark 进行数据写入。 通过 offlinestore, 我们可以生成 training set 数据,也就是样本,用于模型训练。再一个可以生成 batch predition 数据, 用于批量预测。

Onlinestore

在线预测时,需要低延迟的获取特征数据, onlinestore 提供在线特征数据的存储。我们目前优先支持 hologres 或者 redis。

在线特征的获取

我们可以从 FeatureView 的角度获取在线特征

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.get_online_features(join_ids={'movie_id':['26357307']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_1)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_2 = batch_feature_view.get_online_features(join_ids={'movie_id':['30444960']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_2)

FeatureSelector

当我们从 offlinestore 或者 onlinestore 获取特征时,需要明确的指出该获取哪些特征。可以从 FeatureView 的角度来选择特征.

feature_view_name = 'feature_view_movie'
# 选择部分特征
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
#选择全部特征
feature_selector = FeatureSelector(feature_view_name, '*')
# 支持别名
feature_selector = FeatureSelector(
    feature_view='user1',
    features = ['f1','f2', 'f3'],
    alias={"f1":"f1_1"} # 字段别名,最终会产出 f1_1 的字段名称 
)

TrainingSet

当我们要训练模型的时候,首先要构造样本表。样本表是由 label 数据和 特征数据组成。在与 FS 交互时, label 数据需要由客户提供,需要定义要获取的特征名称,然后根据主键进行 point-in-time join( 存在 event_time的情况下)

label_ds = MaxComputeDataSource(data_source_id=offline_datasource_id, table='fs_movie_6_feature_view_ratings_offline')
output_ds = MaxComputeDataSource(data_source_id=offline_datasource_id)
label_input = LabelInput(label_ds, event_time='rating_time')
train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags'])
feature_view_user_name = 'feature_view_users_1'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_input, train_set_output, [feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)

Model

从 offlinestore 的角度讲,我们最终是训练出模型,变成服务进行业务的预测。 那么训练的样本可以从上面的 TrainingSet 获得, 然后就是模型训练,最终会部署成服务。

model_name = "fs_model_movie_rating_3"
owner = 'yancheng'
deploy_config = EASDeployConfig(ak_id= '',region='',config='')
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, owner, train_set, deploy_config)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

导出样本表

实际训练的时候,我们需要导出样本表

指定 label 表以及各个 feature view 的分区, event_time

label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')
movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users_1', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)

根据指定的条件,导出样本表

task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task = ", task.task_summary)
相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
消息中间件 程序员 API
Flink中时间和窗口
Flink中时间和窗口
490 0
|
分布式计算 自然语言处理 DataWorks
高效使用 PyODPS 最佳实践
以更清晰的认知 PyODPS,DataWorks PyODPS 节点以及 PyODPS 何时在计算集群运行,开发者如何利用 PyODPS 更高效地进行数据开发。
18584 3
高效使用 PyODPS 最佳实践
|
3月前
|
机器学习/深度学习 自然语言处理 算法
小红书:通过商品标签API自动生成内容标签,优化社区推荐算法
小红书通过商品标签API自动生成内容标签,提升推荐系统精准度与用户体验。流程包括API集成、标签生成算法与推荐优化,实现高效率、智能化内容匹配,助力社交电商发展。
195 0
|
存储 算法 C++
【C++】哈希桶
哈希桶是哈希表中的基本存储单元,用于存放通过哈希函数映射后的数据元素。当不同元素映射至同一桶时,产生哈希冲突,常用拉链法或开放寻址法解决。哈希桶支持高效的数据插入、删除与查找操作,时间复杂度通常为O(1),但在最坏情况下可退化为O(n)。
323 6
|
API 语音技术
基于Asterisk和TTS/ASR语音识别的配置示例
本文介绍了如何在Asterisk服务器上配置TTS(文本转语音)和ASR(自动语音识别)引擎,包括安装Asterisk、选择并配置TTS和ASR引擎、编辑Asterisk配置文件以实现语音识别和合成的功能,以及测试配置的有效性。具体步骤涉及下载安装包、编辑配置文件、设置API密钥等。
826 1
|
SQL 存储 Unix
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
236 2
|
9月前
|
人工智能 自然语言处理 IDE
Trae 开发工具与使用技巧
V哥推荐字节推出的AI原生IDE——Trae,这款工具大幅提升程序员开发效率。Trae定位为“AI协同编程”伙伴,支持零基础用户通过对话完成项目开发。其核心功能包括Builder模式自动生成代码、智能问答辅助开发、上下文引用与多模态开发等。对比Cursor和Windsurf,Trae在中文支持、全自动项目管理和免费模型使用上更具优势。新手可通过3步快速上手:启动Builder模式、一键运行调试、迭代优化。立即体验Trae,开启AI时代编程新篇章!
2366 2
|
数据安全/隐私保护
CTF — 压缩包密码爆破
CTF — 压缩包密码爆破
1504 0
|
Linux 开发工具 C语言
mac/linux中vim永久显示行号、开启语法高亮
步骤1:   cp /usr/share/vim/vimrc ~/.vimrc   先复制一份vim配置模板到个人目录下   注:redhat 改成 cp /etc/vimrc ~/.vimrc 步骤2:   vi ~/.vimrc   进入insert模式,在最后加二行   syntax on   set nu! 保存收工。
2008 0
|
机器学习/深度学习 算法 搜索推荐
多任务学习模型之DBMTL介绍与实现
本文介绍的是阿里在2019年发表的多任务学习算法。该模型显示地建模目标间的贝叶斯网络因果关系,整合建模了特征和多个目标之间的复杂因果关系网络,省去了一般MTL模型中较强的独立假设。由于不对目标分布做任何特定假设,使得它能够比较自然地推广到任意形式的目标上。

热门文章

最新文章