python调用spark示例

简介: python调用spark示例
# coding=utf-8importsysprint('sys.executable--',sys.executable)
importsklearnprint("Sklearn verion is {}".format(sklearn.__version__))
# reload(sys)# sys.setdefaultencoding("utf8")importpandasaspdimportnumpyasnpfromsklearn.linear_modelimportLinearRegressionfromdatetimeimportdate,datetime,timedeltafrompysparkimportSparkConffrompysparkimportSparkContextfrompyspark.sqlimportHiveContext,SQLContextfrompyspark.storagelevelimportStorageLevelfromsklearn.externalsimportjoblibfromsklearn.clusterimportKMeansimportpandasaspdif__name__=="__main__":
db_name=sys.argv[1] #0对应文件名data_date=sys.argv[2]
inc_day=data_date.replace('-','')
k=5iteration=1000print('db_name:{0}      data_date:{1}   inc_day:{2}'.format( db_name, data_date,inc_day))
#测试读取hive数据conf=SparkConf().setAppName("family")
sc=SparkContext(conf=conf)
hc=HiveContext(sc)
sqlContext=SQLContext(sc)
#查询a="""SELECT distinct      create_date                                as     `时间`,     depot_code                                 as     `场地编码`,     ascs_sorting_code                          as     `ascs分拣线code`,     ascs_distinct_waybill_count                as     `补码成功运单数`,     ascs_distinct_photo_count                  as     `补码成功图片数`,     ascs_has_face_count                        as     `有面单数`,     ascs_not_face_count                        as     `无面单数`,     ascs_timeout_photo_count                   as     `补码超时数`,     ascs_success_rate                          as     `补码成功率`,     ascs_pick_supply_count                     as     `分拣成功数`,     ascs_saving_hours                          as     `节约工时`,     ascs_total_no                              as     `自动化处理量`,     ascs_return_count                          as     `回流量`,     ascs_assist_nr_rate                        as     `辅助无读识别率`,     ascs_return_rate                           as     `回流挽回率`,     case when (ascs_distinct_waybill_count-ascs_pick_supply_count)>=0 then 'yes' else 'no' end  as `补码成功运单数是否大于等于分拣成功数`    FROM  dm_argus_sheduler_tmp.brfd_shizhong_buma_success_tmp_buma    where inc_day<='{inc_day}'""".format(inc_day=inc_day)
data_day=hc.sql(a)
# spark转pandasdata=data_day.toPandas()
print(data.columns)
print(data.dtypes)
print(data)
print('补码成功运单数小于分拣成功数的记录:',data[data['补码成功运单数是否大于等于分拣成功数']=='no'])
# inputfile = 'data.csv' #销量及其他属性数据# use_clos=['时间','ascs分拣线code','补码成功运单数','补码成功图片数','有面单数','无面单数',# '补码超时数','补码成功率','分拣成功数','自动化处理量','回流量','辅助无读识别率','回流挽回率'# ]# data = pd.read_csv(inputfile,usecols=use_clos) #读取数据# data=data.dropna(how='any')data=data.fillna(0)
# data=data[(data['ascs分拣线code']!=0)&(data['补码成功运单数']!=0)&(data['补码成功图片数']!=0)]data=data[(data['ascs分拣线code']!=0)]
data.set_index(['时间','ascs分拣线code'],drop=True,inplace=True)
data['补码图片数']=data['有面单数']+data['无面单数']+data['补码超时数']
data['补码超时率']=data['补码超时数']/data['补码图片数']
data['呈像系统有效性1']=data['有面单数']/(data['有面单数']+data['无面单数'])
data['呈像系统有效性2']=(data['补码成功运单数']/data['补码成功图片数'])
data['分拣环节有效性']=(data['分拣成功数']/data['补码成功运单数'])
data['辅助无读识别率']=data['辅助无读识别率']
data['回流挽回率']=data['回流挽回率']
# data=data.dropna(how='any')data=data[['自动化处理量','补码图片数','有面单数','无面单数','补码成功运单数','补码成功图片数','分拣成功数','辅助无读识别率','回流挽回率','补码成功率','补码超时率','呈像系统有效性1','呈像系统有效性2','分拣环节有效性']]
data_zs=1.0*(data-data.min())/data.max() #数据标准化data_zs=data_zs.fillna(data_zs.mean())
model=KMeans(n_clusters=k, n_jobs=4, max_iter=iteration) #分为k类,并发数4model.fit(data_zs) #开始聚类#整理输出数据#1.模型相关信息r1=pd.DataFrame(pd.Series(model.labels_).value_counts() ,columns=['各类数量'])#统计各个类别的数目r2=pd.DataFrame(model.cluster_centers_,columns=data.columns) #找出聚类中心cluster_cnt=pd.concat([r2, r1], axis=1) #横向连接(0是纵向),得到聚类中心对应的类别下的数目r2=r2.sort_values(by=['回流挽回率'],ascending=False)
new_label={ v:str(i+1)+'类'fori,vinenumerate(r2.index)}#可解释的类别映射cluster_cnt[u'聚类类别']=cluster_cnt.index.map(new_label)
cluster_cnt=cluster_cnt.sort_values(by=['聚类类别'])
cluster_cnt=cluster_cnt[['聚类类别','各类数量']]
cluster_cnt.columns=['centers_no','cnt']
print('各类数量:',cluster_cnt)
#2.聚类结果#关联可解释性类别标识作为index【入库保存】#详细输出原始数据及其类别r=pd.concat([data, pd.Series(model.labels_, index=data.index,name='聚类类别').map(new_label)], axis=1) 
#关联可解释性类别标识作为index【入库保存】r_=r.copy(deep=True)
r_=r_.reset_index()
r_=r_[r_['时间']==data_date]
r_=r_[['时间','ascs分拣线code','辅助无读识别率','回流挽回率','聚类类别']]
r_.columns=['date','ascs_code','fuzhuwudushibielv','huiliuwanhuilv','class']
print('聚类结果:',r_)
#pandas转sparkcluster_cnt=hc.createDataFrame(cluster_cnt)
r_=hc.createDataFrame(r_)
#建表hc.sql("set hive exec.dynamic.partition.mode =nonstrict")
hc.sql("set hive exec.dynamic.partition = true")
#各类数量sql_str="CREATE TABLE IF NOT EXISTS {db_name}.brfd_ascs_kmeans_out_1 (centers_no string, cnt string) partitioned BY (inc_day string) row format delimited fields terminated BY '|'".format(db_name=db_name)
hc.sql(sql_str)
#聚类结果sql_str="CREATE TABLE IF NOT EXISTS {db_name}.brfd_ascs_kmeans_out_2 (date string, ascs_code string,fuzhuwudushibielv string ,huiliuwanhuilv string, class string) partitioned BY (inc_day string) row format delimited fields terminated BY '|'".format(db_name=db_name)
hc.sql(sql_str)
#分区表可以先创建三种临时表hc.sql("use {db_name}".format(db_name=db_name))
cluster_cnt.registerTempTable("temp_table_1")
r_.registerTempTable("temp_table_2")
#插入方式三种hc.sql("""                   insert overwrite table {db_name}.brfd_ascs_kmeans_out_1 partition(inc_day='{inc_day}')                   select *                   from temp_table_1                   """.format(db_name=db_name,inc_day=inc_day)
               )
hc.sql("""                   insert overwrite table {db_name}.brfd_ascs_kmeans_out_2 partition(inc_day='{inc_day}')                   select *                   from temp_table_2                   """.format(db_name=db_name,inc_day=inc_day)
               )
# 
目录
相关文章
|
4月前
|
数据挖掘 Python
Python示例,展示如何找到最近一次死叉之后尚未形成金叉的位置
【10月更文挑战第7天】金融分析中,“死叉”指短期移动平均线(如MA5)跌破长期移动平均线(如MA10),而“金叉”则相反。本文提供Python代码示例,用于找出最近一次死叉后未形成金叉的位置,涵盖移动平均线计算、交叉点判断及结果输出等步骤,适合金融数据分析。
58 4
|
5月前
|
Python
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
|
8天前
|
数据挖掘 数据处理 开发者
Python3 自定义排序详解:方法与示例
Python的排序功能强大且灵活,主要通过`sorted()`函数和列表的`sort()`方法实现。两者均支持`key`参数自定义排序规则。本文详细介绍了基础排序、按字符串长度或元组元素排序、降序排序、多条件排序及使用`lambda`表达式和`functools.cmp_to_key`进行复杂排序。通过示例展示了如何对简单数据类型、字典、类对象及复杂数据结构(如列车信息)进行排序。掌握这些技巧可以显著提升数据处理能力,为编程提供更强大的支持。
24 10
|
5月前
|
存储 Python
Python示例:分解一个不多于指定位的正整数
Python示例:分解一个不多于指定位的正整数
36 0
|
2月前
|
数据可视化 Python
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
通过这些思维导图和分析说明表,您可以更直观地理解和选择适合的数据可视化图表类型,帮助更有效地展示和分析数据。
92 8
|
2月前
|
API Python
【Azure Developer】分享一段Python代码调用Graph API创建用户的示例
分享一段Python代码调用Graph API创建用户的示例
65 11
|
3月前
|
网络安全 Python
Python网络编程小示例:生成CIDR表示的IP地址范围
本文介绍了如何使用Python生成CIDR表示的IP地址范围,通过解析CIDR字符串,将其转换为二进制形式,应用子网掩码,最终生成该CIDR块内所有可用的IP地址列表。示例代码利用了Python的`ipaddress`模块,展示了从指定CIDR表达式中提取所有IP地址的过程。
79 6
|
3月前
|
数据挖掘 Python
Python示例,展示如何找到最近一次死叉之后尚未形成金叉的位置
金融分析中,“死叉”指短期移动平均线(如MA5)跌破长期移动平均线(如MA10),而“金叉”则相反。本文提供Python代码示例,用于找出最近一次死叉后未形成金叉的位置,涵盖移动平均线计算、交叉点判断及结果输出等步骤,适合金融数据分析。
35 1
|
5月前
|
Python
Python编程的循环结构小示例(二)
Python编程的循环结构小示例(二)
70 1
|
4月前
|
Linux Android开发 开发者
【Python】GUI:Kivy库环境安装与示例
这篇文章介绍了 Kivy 库的安装与使用示例。Kivy 是一个开源的 Python 库,支持多平台开发,适用于多点触控应用。文章详细说明了 Kivy 的主要特点、环境安装方法,并提供了两个示例:一个简单的 Hello World 应用和一个 BMI 计算器界面。
132 0

热门文章

最新文章