importsysprint('sys.executable--',sys.executable)
importsklearnprint("Sklearn verion is {}".format(sklearn.__version__))
importpandasaspdimportnumpyasnpfromsklearn.linear_modelimportLinearRegressionfromdatetimeimportdate,datetime,timedeltafrompysparkimportSparkConffrompysparkimportSparkContextfrompyspark.sqlimportHiveContext,SQLContextfrompyspark.storagelevelimportStorageLevelfromsklearn.externalsimportjoblibfromsklearn.clusterimportKMeansimportpandasaspdif__name__=="__main__":
db_name=sys.argv[1] data_date=sys.argv[2]
inc_day=data_date.replace('-','')
k=5iteration=1000print('db_name:{0} data_date:{1} inc_day:{2}'.format( db_name, data_date,inc_day))
conf=SparkConf().setAppName("family")
sc=SparkContext(conf=conf)
hc=HiveContext(sc)
sqlContext=SQLContext(sc)
a="""SELECT distinct create_date as `时间`, depot_code as `场地编码`, ascs_sorting_code as `ascs分拣线code`, ascs_distinct_waybill_count as `补码成功运单数`, ascs_distinct_photo_count as `补码成功图片数`, ascs_has_face_count as `有面单数`, ascs_not_face_count as `无面单数`, ascs_timeout_photo_count as `补码超时数`, ascs_success_rate as `补码成功率`, ascs_pick_supply_count as `分拣成功数`, ascs_saving_hours as `节约工时`, ascs_total_no as `自动化处理量`, ascs_return_count as `回流量`, ascs_assist_nr_rate as `辅助无读识别率`, ascs_return_rate as `回流挽回率`, case when (ascs_distinct_waybill_count-ascs_pick_supply_count)>=0 then 'yes' else 'no' end as `补码成功运单数是否大于等于分拣成功数` FROM dm_argus_sheduler_tmp.brfd_shizhong_buma_success_tmp_buma where inc_day<='{inc_day}'""".format(inc_day=inc_day)
data_day=hc.sql(a)
data=data_day.toPandas()
print(data.columns)
print(data.dtypes)
print(data)
print('补码成功运单数小于分拣成功数的记录:',data[data['补码成功运单数是否大于等于分拣成功数']=='no'])
data=data.fillna(0)
data=data[(data['ascs分拣线code']!=0)]
data.set_index(['时间','ascs分拣线code'],drop=True,inplace=True)
data['补码图片数']=data['有面单数']+data['无面单数']+data['补码超时数']
data['补码超时率']=data['补码超时数']/data['补码图片数']
data['呈像系统有效性1']=data['有面单数']/(data['有面单数']+data['无面单数'])
data['呈像系统有效性2']=(data['补码成功运单数']/data['补码成功图片数'])
data['分拣环节有效性']=(data['分拣成功数']/data['补码成功运单数'])
data['辅助无读识别率']=data['辅助无读识别率']
data['回流挽回率']=data['回流挽回率']
data=data[['自动化处理量','补码图片数','有面单数','无面单数','补码成功运单数','补码成功图片数','分拣成功数','辅助无读识别率','回流挽回率','补码成功率','补码超时率','呈像系统有效性1','呈像系统有效性2','分拣环节有效性']]
data_zs=1.0*(data-data.min())/data.max() data_zs=data_zs.fillna(data_zs.mean())
model=KMeans(n_clusters=k, n_jobs=4, max_iter=iteration) model.fit(data_zs) r1=pd.DataFrame(pd.Series(model.labels_).value_counts() ,columns=['各类数量'])r2=pd.DataFrame(model.cluster_centers_,columns=data.columns) cluster_cnt=pd.concat([r2, r1], axis=1) r2=r2.sort_values(by=['回流挽回率'],ascending=False)
new_label={ v:str(i+1)+'类'fori,vinenumerate(r2.index)}cluster_cnt[u'聚类类别']=cluster_cnt.index.map(new_label)
cluster_cnt=cluster_cnt.sort_values(by=['聚类类别'])
cluster_cnt=cluster_cnt[['聚类类别','各类数量']]
cluster_cnt.columns=['centers_no','cnt']
print('各类数量:',cluster_cnt)
r=pd.concat([data, pd.Series(model.labels_, index=data.index,name='聚类类别').map(new_label)], axis=1)
r_=r.copy(deep=True)
r_=r_.reset_index()
r_=r_[r_['时间']==data_date]
r_=r_[['时间','ascs分拣线code','辅助无读识别率','回流挽回率','聚类类别']]
r_.columns=['date','ascs_code','fuzhuwudushibielv','huiliuwanhuilv','class']
print('聚类结果:',r_)
cluster_cnt=hc.createDataFrame(cluster_cnt)
r_=hc.createDataFrame(r_)
hc.sql("set hive exec.dynamic.partition.mode =nonstrict")
hc.sql("set hive exec.dynamic.partition = true")
sql_str="CREATE TABLE IF NOT EXISTS {db_name}.brfd_ascs_kmeans_out_1 (centers_no string, cnt string) partitioned BY (inc_day string) row format delimited fields terminated BY '|'".format(db_name=db_name)
hc.sql(sql_str)
sql_str="CREATE TABLE IF NOT EXISTS {db_name}.brfd_ascs_kmeans_out_2 (date string, ascs_code string,fuzhuwudushibielv string ,huiliuwanhuilv string, class string) partitioned BY (inc_day string) row format delimited fields terminated BY '|'".format(db_name=db_name)
hc.sql(sql_str)
hc.sql("use {db_name}".format(db_name=db_name))
cluster_cnt.registerTempTable("temp_table_1")
r_.registerTempTable("temp_table_2")
hc.sql(""" insert overwrite table {db_name}.brfd_ascs_kmeans_out_1 partition(inc_day='{inc_day}') select * from temp_table_1 """.format(db_name=db_name,inc_day=inc_day)
)
hc.sql(""" insert overwrite table {db_name}.brfd_ascs_kmeans_out_2 partition(inc_day='{inc_day}') select * from temp_table_2 """.format(db_name=db_name,inc_day=inc_day)
)