PyODPS 介绍及使用方法
1. PyODPS 通过该 SDK,可以更高效地开发 MaxCompute 任务、进行数据分析和管理 MaxCompute 资源。
2. 安装 PyODPS
PyODPS 兼容 Python 2(2.6 以上版本)和 Python 3 版本。可以通过以下命令安装 PyODPS:
pip install pyodps
安装完成后,可以通过以下代码初始化 MaxCompute 的入口:
import os
from odps import ODPS
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
```。
#### 3. 使用 PyODPS 进行数据处理
PyODPS 提供了 DataFrame 框架,支持更方便地方式来查询和操作 MaxCompute 数据。以下是一些基本操作的示例:
##### 3.1 创建表
```python
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
或者使用更简单的方式:
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
```。
##### 3.2 写入数据
可以通过 `open_writer` 方法写入数据:
```python
with t.open_writer(partition='pt=test') as writer:
records = [[111, 'aaa', True], [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]]
writer.write(records)
也可以使用 write_table
方法:
records = [[111, 'aaa', True], [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]]
o.write_table('test_table', records, partition='pt=test', create_partition=True)
```。
##### 3.3 查询表数据
可以通过调用 `head` 方法获取表数据:
```python
t = o.get_table('dual')
for record in t.head(3):
print(record[0])
print(record['c_double_a'])
print(record[0: 3])
print(record[0, 2, 3])
print(record['c_int_a', 'c_double_a'])
或者通过 open_reader
方法读取数据:
with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader[5:10]:
# 处理记录
```。
##### 3.4 数据上传下载
MaxCompute Tunnel 是 MaxCompute 的数据通道,用户可以通过 Tunnel 向 MaxCompute 中上传或者下载数据:
```python
from odps.tunnel import TableTunnel
table = o.get_table('my_table')
tunnel = TableTunnel(odps)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
with upload_session.open_record_writer(0) as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
upload_session.commit([0])
```。
#### 4. 在 DataWorks 中使用 PyODPS
DataWorks 提供了 PyODPS 节点,可以在该节点中直接使用 Python 代码编写 MaxCompute 作业,并进行作业的周期性调度。以下是一些关键点:
- **ODPS 入口**:DataWorks 的 PyODPS 节点中会包含一个全局变量 `odps` 或 `o`,即 ODPS 入口,无需手动定义。
- **执行 SQL**:可以在 PyODPS 节点中执行 SQL,例如:
```python
print(odps.exist_table('PyODPS_iris'))
- 设置运行参数:可以通过设置
hints
参数来设置运行时的参数,例如:o.execute_sql('select * from PyODPS_iris', hints={ 'odps.sql.mapper.split.size': 16})
- 读取运行结果:运行 SQL 的实例能够直接执行
open_reader
的操作,例如:
```python
with o.execute_sql('select * from dual').open_reader() as reader:
```。for record in reader: # 处理每一个 record