第1关:Phoenix初识
package com.educoder.bigData.sparksql4; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class Case1 { static { try { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void createEducoderTable1() { Connection connection = null; try { connection = DriverManager.getConnection("jdbc:phoenix:127.0.0.1:2181"); connection.createStatement().execute( "CREATE TABLE EDUCODER_TABLE1 (ID BIGINT not null primary key, info.BEGINTIME VARCHAR, ENDTIME VARCHAR, SALARY INTEGER, CITY VARCHAR)DATA_BLOCK_ENCODING='DIFF',VERSIONS=3,BLOCKSIZE='32000',MAX_FILESIZE=10000000"); connection.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(connection != null) { try { connection.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void createEducoderTable2() { Connection connection = null; try { connection = DriverManager.getConnection("jdbc:phoenix:127.0.0.1:2181"); connection.createStatement().execute( "CREATE TABLE \"educoder_table2\" (\"id\" BIGINT not null primary key, \"info\".\"begintime\" VARCHAR, \"endtime\" VARCHAR, \"salary\" INTEGER, \"city\" VARCHAR)COMPRESSION='GZ',VERSIONS=5,BLOCKSIZE='65536',MAX_FILESIZE=20000000"); connection.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(connection != null) { try { connection.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
第2关 Phoenix 查询和更新
package com.educoder.bigData.sparksql4; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; public class Test2 extends InitHdb2{ public static void main(String[] args) throws SQLException { Connection connection = null; try { connection = DriverManager.getConnection("jdbc:phoenix:127.0.0.1:2181"); connection.createStatement().executeUpdate("UPSERT INTO EDUCODER_TABLE1 VALUES('20190615','20190915', 1, 20, '上海') ON DUPLICATE KEY UPDATE SALARY = SALARY + 20"); connection.createStatement().executeUpdate("UPSERT INTO EDUCODER_TABLE1 VALUES('20190618','20190918', 2, 10, '北京') ON DUPLICATE KEY UPDATE SALARY = SALARY + 20"); connection.commit(); queryTable(connection,"EDUCODER_TABLE1"); connection.createStatement().executeUpdate("UPSERT INTO \"educoder_table2\" SELECT * FROM EDUCODER_TABLE1 "); connection.commit(); queryTable(connection,"educoder_table2"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(connection != null) { try { connection.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } private static void queryTable(Connection connection, String tableName) throws SQLException { ResultSet executeQuery = connection.createStatement().executeQuery("select * from \""+tableName+"\""); ResultSetMetaData metaData = executeQuery.getMetaData(); int n = 0; while (executeQuery.next()) { System.out.println(tableName + "第" + ++n + "条数据:"); for (int j = 0; j < metaData.getColumnCount(); j++) { String col_name = metaData.getColumnName(j + 1); Object value = executeQuery.getObject(col_name); System.out.print(col_name + ":" + value); if(j != metaData.getColumnCount() -1) { System.out.print(","); } else { System.out.println(); } } } } }
第3关 Phoenix 二级索引
package com.educoder.bigData.sparksql4; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; public class Test3 extends InitHdb3{ public static void main(String[] args) throws SQLException { Connection connection = null; /********* Begin *********/ String sql = "select * from my_index limit 10"; connection = DriverManager.getConnection("jdbc:phoenix:127.0.0.1:2181"); connection.createStatement().executeUpdate("CREATE INDEX my_index ON EDUCODER_TABLE1 (salary) "); /********* End *********/ queryTable(sql, connection); } public static void queryTable(String sql,Connection connection) throws SQLException { ResultSet executeQuery = connection.createStatement().executeQuery(sql); ResultSetMetaData metaData = executeQuery.getMetaData(); System.out.println("索引表数据:"); while (executeQuery.next()) { for (int j = 0; j < metaData.getColumnCount(); j++) { String col_name = metaData.getColumnName(j + 1); Object value = executeQuery.getObject(col_name); System.out.print(col_name + ":" + value); if(j != metaData.getColumnCount() -1) { System.out.print(","); } else { System.out.println(); } } } } }
第4关 Phoenix Spark操作
package com.educoder.bigData.sparksql4; import java.util.Arrays; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class Case4 { public static void case4(TableVo1 vo) { SparkSession spark = SparkSession.builder().appName("test1").master("local").getOrCreate(); JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD<TableVo1> parallelize = context.parallelize(Arrays.asList(vo)); Dataset<Row> df = spark.createDataFrame(parallelize, TableVo1.class); df.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark").option("zkUrl", "127.0.0.1:2181").option("table", "OUTPUT_TABLE").save(); spark.read().option("table", "OUTPUT_TABLE").option("zkUrl", "127.0.0.1:2181") .format("org.apache.phoenix.spark").load().show(); } }