SparkSQL ETL

发布时间:2021-11-30 01:32:28



文章目录
需求说明代码分析调优总结





记一次SparkSql ETL 过程


需求说明

1)input:json日志
2)ETL:根据IP解析出 省份,城市
3)stat: 地区分布指标计算,
满足条件的才算,满足条件的赋值为1,不满足的赋值为0 (如下图)
将统*峁慈隡ySQL中。
(就比如说这个广告请求要满足 requestmode=1 和 processnode =3 这两个条件)


代码分析

val spark = SparkSession.builder().master("local[2]").appName("LogApp").getOrCreate()
import spark.implicits._
val inputDF = spark.read.json("inputdata/data-test.json")
inputDF.printSchema()

// ETL: 一定保留原有的数据 最完整 而且要落地 (理由:要是数据出错好重新计算)
val newDF = inputDF.withColumn("province", MyUDF.getProvince(inputDF.col("ip")))
.withColumn("city", MyUDF.getCity($"ip"))//自定义udf 函数
.write.format("parquet")
.mode(SaveMode.Overwrite)
.save("outparquet") // 最好保存parquet格式 (spark默认就是parquet + snappy)

// 计算 重新去读取etl之后的数据源
val parquetDF = spark.read.parquet("outparquet/xxx.snappy.parquet")
parquetDF.printSchema()
parquetDF.show(5)
parquetDF.createOrReplaceTempView("log")

//业务SQL
val areaSQL01 = "select province,city, " +
"sum(case when requestmode=1 and processnode >=1 then 1 else 0 end) origin_request," +
"sum(case when requestmode=1 and processnode >=2 then 1 else 0 end) valid_request," +
"sum(case when requestmode=1 and processnode =3 then 1 else 0 end) ad_request," +
"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and isbid=1 and adorderid!=0 then 1 else 0 end) bid_cnt," +
"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 then 1 else 0 end) bid_success_cnt," +
"sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) ad_display_cnt," +
"sum(case when requestmode=3 and processnode=1 then 1 else 0 end) ad_click_cnt," +
"sum(case when requestmode=2 and iseffective=1 and isbilling=1 then 1 else 0 end) medium_display_cnt," +
"sum(case when requestmode=3 and iseffective=1 and isbilling=1 then 1 else 0 end) medium_click_cnt," +
"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 and adorderid>20000 then 1*winprice/1000 else 0 end) ad_consumption," +
"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 and adorderid>20000 then 1*adpayment/1000 else 0 end) ad_cost " +
"from log group by province,city"
spark.sql(areaSQL01).createOrReplaceTempView("area_tmp")

val areaSQL02 = "select province,city, " +
"origin_request," +
"valid_request," +
"ad_request," +
"bid_cnt," +
"bid_success_cnt," +
"bid_success_cnt/bid_cnt bid_success_rate," +
"ad_display_cnt," +
"ad_click_cnt," +
"ad_click_cnt/ad_display_cnt ad_click_rate," +
"ad_consumption," +
"ad_cost from area_tmp " +
"where bid_cnt!=0 and ad_display_cnt!=0"

// 写入MySQL (上一篇博客有介绍)
val config = ConfigFactory.load()
val url = config.getString("db.default.url")
val user = config.getString("db.default.user")
val password = config.getString("db.default.password")

spark.sql(areaSQL02)
.write.format("jdbc")
.option("url", url)
.option("dbtable", "sparksql_test")
.option("user", user)
.option("password", password)
.mode(SaveMode.Overwrite)
.save()

spark.stop()

自定义udf 函数代码


object MyUDF {

import org.apache.spark.sql.functions._

def getProvince = udf((ip:String)=>{
val cityInfo = IPUtil.getCityInfo(ip)
val splits = cityInfo.split("\|")
var city = "未知"
if (splits.length == 5){
city = splits(2)
}
city

})


def getCity = udf((ip:String)=>{
val cityInfo = IPUtil.getCityInfo(ip)
val splits = cityInfo.split("\|")
var city = "未知"
if (splits.length == 5){
city = splits(3)
}
city
})

}


调优

① ETL 落地过程中应该调用coalesce() 防止产生多个小文件


val newDF = inputDF.withColumn("province", MyUDF.getProvince(inputDF.col("ip")))
.withColumn("city", MyUDF.getCity($"ip"))
.coalesce(1)
.write.format("parquet").mode(SaveMode.Overwrite).save("outparquet")

② spark.conf.set(“spark.sql.shuffle.partitions”,“400”) 修改SparkSql shuffle task数量,默认是200


总结

ETL过程:
input:json
清洗 ==> ODS 大宽表 HDFS/Hive/SparkSQL
output: 列式存储 ORC/Parquet (列式存储) (为啥要用这两种? 因为ETL清洗出来的是全字段,我们不可能使用到全部字段,所以采用列式存储,用到几列就获取几列,这样就能减少I/O,性能大大提升)


Stat
==> 一个非常简单的SQL搞定
==> 复杂:多个SQL 或者 一个复杂SQL搞定


列式:ORC/Parquet
特点:把每一列的数据存放在一起
优点:减少IO 需要哪几列就直接获取哪几列
缺点:如果你还是要获取每一行中的所有列,那么性能比行式的差


行式:MySQL
一条记录有多个列 一行数据是存储在一起的
优点:
你每次查询都使用到所有的列
缺点:
大宽表有N多列,但是我们仅仅使用其中几列

相关文档

  • 成都市房屋租赁居间合同范本
  • 实木美容床排行榜前十名
  • 赞美黄河的优美句子大全
  • CSS中浮动的特点
  • 如何提高小学生学习的积极性
  • 升级win10之后是不是不能使用外挂了
  • 六个月宝宝辅食食谱要点
  • 空气培养皿采样后保存_选择满足GMP法规要求的浮游菌采样器(便携式和在线)...
  • 我最心爱的东西学生作文400字3篇
  • 微信发送键跑盗表情里了怎么调到上面
  • 初一英语备课组工作计划
  • 【金猿产品展】全天智能实时大数据可视化分析平台??数据可视化行业探索者...
  • 春季睡前跳绳十分钟助长个 跳绳减肥需要循序渐进
  • 马戛尔尼访华失败的原因介绍
  • 一个成功者背后的故事
  • 我的朋友朱宇伟
  • 海南铁塔获批使用5905-5925MHz频率开展车联网业务试验
  • 梦见手机***意味着什么
  • 经验总结!我们是如何在编程面试中挂掉的
  • 基于JavaWeb(JSP+Servlet+MySQL)编程实现员工信息的添加、修改、删除、列表显示。
  • 初一开学第一周总结
  • 有关于孕妇请假的请假条
  • 唐寅《把酒对月歌》古诗词鉴赏
  • 你的身边
  • C语言数组&字符串&结构体&共用体&枚举知识点总结
  • 北师大版六年级语文教学计划范文
  • 四川中考现代文阅读练习带答案
  • 中餐连锁餐饮品牌排行榜:小肥羊上班,庆丰包子铺第四
  • 舍不得忘舍不得放简谱
  • 五年级作文谈谈读书的体会
  • 猜你喜欢

  • 最新-2019年村主任助理学*工作计划范文 精品
  • 嵌入式第一周学习心得
  • 2021春分手抄报画法 2021春分手抄报
  • 个人租房合同范本最新的[工作范文]
  • 慢性腰肌劳损如何自我保健
  • 【教育学*文章】学校“纪念中国人民解放军建军90周年”主题活动总结
  • 【精品】2019八年级物理上册 4.4《升华和凝华》随堂训练 (新版)粤教沪版
  • 美调查称:职业女性*均收入将超男性
  • 高邑县文果农副产品有限公司企业信用报告-天眼查
  • 青岛市塑料编织袋行业企业名录2018版124家
  • 商业保险管理制度守则
  • 2019教师节演讲稿900字范文
  • 2018-2019年唐山市迁安市迁安镇潘营中心完全小学一年级上册数学模拟月考无答案
  • 《管理运筹学》演示(整数规划)ppt课件
  • 最新 《欧也妮葛朗台》1000字读书笔记范文-精品
  • 【合同范文】精选建筑工程施工合同范文
  • 数据结构链表、二叉树、排序算法
  • 数学百大经典例题——离散型随机变量分布列(新课标) doc
  • 一维信号小波压缩的能量动态自适应阈值算法
  • 桂林市宥辰贸易有限公司企业信用报告-天眼查
  • 刷酸后用洗吗刷酸后可以敷面膜吗
  • Long类型的用时时长转换为年月日时分秒毫秒格式
  • 全球知名CEO的华鸱ㄔ
  • 胃烧心的原因与诊断
  • 高中数学 圆锥曲线与方程 24242 抛物线的简单几何性质 时 抛物线的简单几何性质练习 新人教A版选修21
  • 基于校企合作的校外实训基地管理机制的设立初探
  • 2013安徽省公选(公开选拔)领导干部最新考试试题库(完整版)
  • 朱理版机械原理课后作业全部答案-文档资料
  • 2018秋中图版历史九年级上册第11课《文艺复兴》课件(共35页)
  • Clonidine for attention-deficit hyperactivity disorder II. ECG changes and adverse events analysis
  • 培养兴趣是学生智力发展的前提
  • 当代大学生礼仪修养的探究-精选文档
  • 女人美容养颜汤的制作方法
  • 四年级下册英语课件-M9 U1 Did he live in New York|外研社(三起) (6) (共15张PPT)
  • 三年级数学下册六认识分数比大小精炼2无答案北师大版
  • 生理 重点 华中师范大学体育学院
  • 涡街流量计原理及应用
  • 2018-2019年英语外研版小学三年级下册M4教学案2
  • JAVA的垃圾收集过程(3)
  • 死锁的演示
  • 歌颂祖国的诗歌-祖国啊,我亲爱的祖国
  • 我的最美芳华作文
  • 电脑版