Spark对接Lindorm Phoenix5.x轻客户端

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。

1. 背景

Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。

2. Spark对接Phoenix 5.x轻客户端

2.1 从Spark官网下载Spark安装包

Spark官网下载Spark安装包,版本自行选择,本文以Spark-2.4.3版本为例。下载后解压,假定目录为spark-2.4.3

2.2 从阿里云仓库下载Phoenix5.x轻客户端

阿里云仓库下载Phoenix5.x轻客户端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于spark-2.4.3下的jars目录下。

2.3 生成log4j.properties

cd spark-2.4.3/conf
cp log4j.properties.template log4j.properties

2.3 启动spark-shell

./bin/spark-shell 

2.4 粘贴运行代码

2.4.1 Phoenix Statement方式访问

  1. 在spark-shell上输入:paste可以输入多行文本
:paste
  1. 修改下面代码中的url, user, password为自己的实例集群信息,然后全部粘贴于spark-shell中
import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
    val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
    val info = new Properties()
    info.put("user", "xxxx") //表示用户名是root
    info.put("password", "xxxx") //表示密码是hadoop
    try {
      Class.forName(driver)
    } catch {
      case e: ClassNotFoundException => e.printStackTrace
    }
    val conn = DriverManager.getConnection(url, info)
    val stmt = conn.createStatement
    try {
      stmt.execute("drop table if exists test")
      stmt.execute("create table test(c1 integer primary key, c2 integer)")
      stmt.execute("upsert into test(c1,c2) values(1,1)")
      stmt.execute("upsert into test(c1,c2) values(2,2)")
      val rs = stmt.executeQuery("select * from test limit 1000")
      while (rs.next()) {
        println(rs.getString(1) + " | " +
          rs.getString(2) )
      }
      stmt.execute("drop table if exists test")
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      if (null != stmt) {
        stmt.close()
      }
      if (null != conn) {
        conn.close()
      }
    }
  1. 输入Ctrl+D 结束文本输入,即可看到运行结果, 会显示类似如下信息:
// Exiting paste mode, now interpreting.

1 | 1
2 | 2

2.4.2 DataFrame方式访问

DataFrame方式只能进行读写,建表操作和删表操作需要使用Phoenix Statement方式。

2.4.2.1 DataFrame方式读

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()

2.4.2.1 DataFrame方式写

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//创建schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "xxxx") //表示用户名是root
prop.put("password", "xxxx") //表示密码是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver") 
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)

3. Maven工程示例

下面以一个maven工程为例介绍spark对接phoenix轻客户端的一些基本操作

3.1 建立maven工程

建立名叫demo的maven工程,pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.4.3</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.phoenix</groupId>
            <artifactId>ali-phoenix-queryserver-client</artifactId>
            <version>5.2.1-HBase-2.x</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3.2 scala文件示例

PhoenixTest1.scala, 记得修改url,user,password信息。

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}

import java.sql.{DriverManager, SQLException}
import java.util.Properties

object PhoenixTest1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("spark on phoenix")
    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val sc = sparkSession.sparkContext

    print("======= start ==========")
    val driver = "org.apache.phoenix.queryserver.client.Driver"
    val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
    val info = new Properties()
    info.put("user", "xxxx") 
    info.put("password", "xxxx") 
    try {
      Class.forName(driver)
    } catch {
      case e: ClassNotFoundException => e.printStackTrace
    }

   //statement操作方式, 可以做所有phoenix ddl和dml操作
    val conn = DriverManager.getConnection(url, info)
    val stmt = conn.createStatement
    try {
      stmt.execute("drop table if exists test")
      stmt.execute("create table test(c1 integer primary key, c2 integer)")
      stmt.execute("upsert into test(c1,c2) values(1,1)")
      stmt.execute("upsert into test(c1,c2) values(2,2)")
      val rs = stmt.executeQuery("select * from test limit 1000")
      while (rs.next()) {
        println(rs.getString(1) + " | " +
          rs.getString(2) )
      }
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      if (null != stmt) {
        stmt.close()
      }
      if (null != conn) {
        conn.close()
      }
    }

    //DataFrame写入
    //生成记录
    val sqlContext = new SQLContext(sc)
    val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
    val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
    val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
    val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    testDataFrame.show()
    
    // 写入记录
    testDataFrame
      .write
      .mode("append")
      .jdbc("jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", info)

    //DataFrame读取, option的两种写法
    val df1 = sqlContext
      .read.
      format("jdbc")
      .options(Map(
        "url" -> url,
        "driver" -> driver,
        "dbtable" -> "TEST",
        "fetchsize" -> "10000",
        "user" -> "root",
        "password" -> "root"))
      .load()
    df1.show()

    val jdbcDF2 = sqlContext.read.format("jdbc")
      .option("url", url)
      .option("driver", driver)
      .option("dbtable", "test")
      .option("fetchsize", "10000")
      .option("user", "xxxx")
      .option("password", "xxxx")
      .load()
    jdbcDF2.show()

    // 将SQL表写入parquet文件
    df1.select("*").write.format("parquet").save("file:///Volumes/wukong/data/work/spark/data/test.parquet")

    // 从parquet文件中加载SQL表
    val df2 = sqlContext.read.load("file:///Volumes/wukong/data/work/spark/data/test.parquet")
    df2.show()
  }
}

3.3 java文件示例

StatementTest.java, 记得修改url,user,password信息。

import org.apache.phoenix.queryserver.client.Driver;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;

public class StatementTest {
  public static void main(String[] args) {
    Connection pconn = null;
    Statement stmt = null;
    try {
      Class.forName(Driver.class.getName());
      String url = "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF";
      Properties props = new Properties();
      props.put("user", "xxxx");
      props.put("password", "xxxx");
      pconn = DriverManager.getConnection(url, props);
      pconn.setAutoCommit(true);
      stmt = pconn.createStatement();
      stmt.execute("drop table if exists test");
      stmt.execute("create table test(c1 integer primary key, c2 integer)");
      stmt.execute("upsert into test(c1,c2) values(1,1)");
      stmt.execute("upsert into test(c1,c2) values(2,2)");
      ResultSet rs = stmt.executeQuery("select * from test limit 1000");
      while (rs.next()) {
        System.out.println(rs.getString(1) + " | " +
            rs.getString(2));
      }
      stmt.execute("drop table if exists test");
    } catch (Throwable e) {
      e.printStackTrace();
    } finally {
      try {
        if (pconn != null) {
          pconn.close();
        }
      } catch (Throwable e) {
        e.printStackTrace();
      }
    }
  }
}

3.4 打包

mvn clean package -DskipTests

target下生成demo-1.0-SNAPSHOT.jar

3.5 提交到本地运行

./bin/spark-submit  --master local --verbose --class PhoenixTest1 /Volumes/wukong/data/work/spark/demo/target/demo-1.0-SNAPSHOT.jar 
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
存储 SQL 分布式计算
DataWorks_数据开发_EMR Spark节点_计算Pi和对接MaxCompute案例
DataWorks_数据开发_EMR Spark节点 1)计算Pi; 2)对接MaxCompute。
628 0
DataWorks_数据开发_EMR Spark节点_计算Pi和对接MaxCompute案例
|
缓存
spark2.1.0之源码分析——RPC客户端TransportClient详解
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/82143001 提示:阅读本文前最好先阅读: 《Spark2.
1691 0
|
缓存 分布式计算 前端开发
spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80981101 提示:阅读本文前最好先阅读《Spark2.1.0之内置RPC框架》和《spark2.1.0之源码分析——RPC配置TransportConf》。
1553 0
|
19天前
|
SQL 存储 运维
从建模到运维:联犀如何完美融入时序数据库 TDengine 实现物联网数据流畅管理
本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。
31 1
|
7月前
|
存储 SQL 多模数据库
多模数据库Lindorm再升级:对接Dataphin,打通数据治理“最后一公里”
Lindorm通过与Dataphin的深度整合,进一步解决了数据集成和数据治理的问题,为企业提供更加高效和更具性价比的方案。
多模数据库Lindorm再升级:对接Dataphin,打通数据治理“最后一公里”
|
6月前
|
安全 数据管理
DataphinV4.1大升级:支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
DataphinV4.1大升级:支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
|
7月前
|
数据采集 安全 API
DataphinV4.1大升级: 支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
Dataphin 是阿里巴巴旗下的一个智能数据建设与治理平台,旨在帮助企业构建高效、可靠、安全的数据资产。在V4.1版本升级中,Dataphin 引入了Lindorm等多项新功能,并开启公共云半托管模式,优化代码搜索,为用户提供更加高效、灵活、安全的数据管理和运营环境,提升用户体验,促进企业数据资产的建设和价值挖掘。
1619 3
DataphinV4.1大升级: 支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
|
6月前
|
SQL 分布式计算 BI
实时计算 Flink版产品使用问题之基于宽表数据展示实时报表,该如何实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。