前言
最近正好有个需求,就是从不同的数据库以及表里拉出数据,经过一定的处理放到ES里供查询,最好还能放个到parquet里,这样可以支持更复杂的SQL。之前StreamingPro是只能配置一个数据源的,所以做了些改造,方便配置多个数据源,以及多个写出。
如果是数据库,则可以这么写:
将配置文件保存一下,然后就可以启动了:
最近正好有个需求,就是从不同的数据库以及表里拉出数据,经过一定的处理放到ES里供查询,最好还能放个到parquet里,这样可以支持更复杂的SQL。之前StreamingPro是只能配置一个数据源的,所以做了些改造,方便配置多个数据源,以及多个写出。
最新的下载地址: https://pan.baidu.com/s/1eRO5Wga 依然的,比较大,因为现在他还能支持Thrift JDBC /Rest SQL: 使用StreamingPro 快速构建Spark SQL on CarbonData。
输入配置
{
"name": "batch.sources",
"params": [
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"outputTable": "test",
"header": "true"
},
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"outputTable": "test2",
"header": "true"
}
]
},
以前用的是 batch.source, 如果你有多个输入源,则需要使用batch.sources 组件。每个源需要配置一个outputTable,也就是说这个源取个名字,方便后面使用。
如果是数据库,则可以这么写:
{
"name": "batch.sources",
"params": [
{
url:"jdbc:mysql://localhost/test?user=fred&password=secret",
"dbtable":"table1",
"driver":"com.mysql...",
"path": "-",
"format": "jdbc",
"outputTable": "test",
},
{
"path": "-",
"format": "com.databricks.spark.csv",
"outputTable": "test2",
"header": "true"
}
]
},
输出
{
"name": "batch.outputs",
"params": [
{
"format": "json",
"path": "file:///tmp/kk2",
"inputTableName": "finalOutputTable"
},
{
"format": "parquet",
"path": "file:///tmp/kk3",
"inputTableName": "finalOutputTable"
}
]
}
我这里同时输出为json以及parquet格式。
一个简单但是涉及点比较多的例子
{
"convert-multi-csv-to-json": {
"desc": "测试",
"strategy": "spark",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "batch.sources",
"params": [
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"outputTable": "test",
"header": "true"
},
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"outputTable": "test2",
"header": "true"
}
]
},
{
"name": "batch.sql",
"params": [
{
"sql": "select city as tp from test limit 100",
"outputTableName": "sqlTable"
}
]
},
{
"name": "batch.script",
"params": [
{
"inputTableName": "sqlTable",
"outputTableName": "scriptTable",
"useDocMap": true
},
{
"-": "val count = doc(\"tp\").toString.length;Map(\"count\"->count)"
}
]
},
{
"name": "batch.sql",
"params": [
{
"sql": "select scriptTable.tp,scriptTable.count,test2.city,test2.name from scriptTable,test2 limit 100",
"outputTableName": "finalOutputTable"
}
]
},
{
"name": "batch.outputs",
"params": [
{
"format": "json",
"path": "file:///tmp/kk2",
"inputTableName": "finalOutputTable"
},
{
"format": "parquet",
"path": "file:///tmp/kk3",
"inputTableName": "finalOutputTable"
}
]
}
],
"configParams": {
}
}
}
在 batch.sql 里你可以引用任何一个源的表,或者之前已经在batch.sql里申明的outputTable, 同理batch.script。 而在batch.outputs里,你则可以将任何一张表写入到MySQL,ES,HDFS等文件存储系统中。
将配置文件保存一下,然后就可以启动了:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.8-SNAPSHOT-online-1.6.1.jar \
-streaming.name test \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json