大数据领域数据架构的关键技术解析
关键词:大数据架构、数据湖、数据仓库、ETL/ELT、流处理、批处理、数据治理
摘要:在数字时代,数据已成为企业的核心资产,而大数据架构则是”管理数据资产的超级工厂”。本文将以”小学生能听懂的故事”为风格,从数据架构的”前世今生”讲起,逐步解析数据湖、数据仓库、ETL/ELT、流处理、批处理等核心组件的原理,通过”超市数据管理”的类比让复杂概念变得直观。我们会深入拆解数据从产生到应用的全流程,用实际代码案例展示批处理与流处理的实现,并通过电商数据架构实战项目,带你亲手搭建一个”数据工厂”。最后,我们还会探讨湖仓一体、实时数仓等前沿趋势,帮你全面掌握大数据架构的”设计密码”。
背景介绍
目的和范围
想象你经营着一家超级大超市,每天有10万个顾客购物,产生100万条交易记录、50万条商品库存变动、20万条会员行为数据……这些数据如果散乱堆放,就像把食材、工具、账本全扔在地上,根本无法管理。大数据架构就是为这样的”数据超市”设计的”超级仓库+智能流水线”,让数据从”杂乱的原材料”变成”可直接使用的商品”。
本文的目的是:
揭开大数据架构的”神秘面纱”,用生活例子解释核心技术讲清数据湖、数据仓库等组件的”分工”和”协作方式”手把手教你用代码实现数据处理流程(批处理+流处理)通过实战项目理解”如何设计一个能用的大数据架构”
范围覆盖:数据架构的核心组件、数据处理全流程、典型技术工具、实战案例及未来趋势,不涉及过于底层的硬件优化(如服务器集群部署细节)。
预期读者
刚接触大数据的程序员/学生(想入门数据架构)数据分析师/产品经理(想理解数据从哪来到哪去)传统IT从业者(想转型大数据领域)对”数据如何变成价值”好奇的任何人
文档结构概述
本文就像”建造数据工厂”的说明书,分7个部分:
打地基:背景介绍与术语表(先认识工具和材料)画图纸:核心概念与架构设计(设计工厂的布局)建流水线:数据处理流程详解(批处理+流处理)开工实战:电商数据架构项目(亲手建一个小工厂)看应用:不同行业的架构案例(参观别人的工厂)选工具:常用技术栈与资源推荐(采购合适的设备)望未来:趋势与挑战(工厂如何升级)
术语表
核心术语定义
| 术语 | 小学生版解释 | 专业版解释 |
|---|---|---|
| 数据湖 | 存放”生食材”的超级冰箱,什么都能扔进去(生肉、生菜、水果),不管格式 | 存储原始、未经处理的海量数据(结构化、半结构化、非结构化)的集中式存储系统,支持任意格式数据的存储和直接访问 |
| 数据仓库 | 存放”熟食材”的后厨,食材已经洗好、切好、分类放(肉丝放一盒、青菜放一格),方便厨师快速做菜 | 面向分析场景,存储经过清洗、整合、结构化处理的数据,按业务主题(如销售、用户、库存)组织,支持高效查询和报表分析 |
| ETL | 洗菜切菜流水线:先把菜从冰箱(数据湖)拿出来(Extract),洗干净、切小块(Transform),再放进后厨的盘子里(Load) | 数据抽取(Extract)、转换(Transform)、加载(Load)的过程,将分散的原始数据经过处理后加载到目标存储(如数据仓库) |
| ELT | 先把菜直接搬进后厨(Load),需要做菜时再洗切(Transform) | 数据加载(Load)、抽取(Extract)、转换(Transform)的过程,先将原始数据加载到目标存储,再在目标端进行转换处理(通常用于数据仓库性能较强的场景) |
| 批处理 | 每天凌晨集中处理前一天的所有食材(比如晚上收集的1000斤菜,早上一起洗切) | 对大量历史数据(通常是GB/TB级)进行周期性(如每天/每周)集中处理的方式,处理延迟较高(分钟到小时级) |
| 流处理 | 客人点单后马上做菜(点一份做一份,实时上桌) | 对实时产生的数据流(如每秒 thousands of records)进行即时处理的方式,处理延迟极低(毫秒到秒级) |
| 数据治理 | 超市的”食品安全监管员”,确保食材没过期、来源合法、操作规范 | 对数据全生命周期(采集、存储、处理、使用)进行管理的体系,包括数据质量、数据安全、元数据管理、权限控制等 |
相关概念解释
数据源:数据的”老家”,比如超市的收银机(交易数据)、会员卡读卡器(用户数据)、仓库扫码枪(库存数据)。元数据:数据的”身份证”,记录数据叫什么名字、从哪来、什么时候更新、谁能看(比如”20240501_sales.csv”是5月1日的交易数据,来自收银系统,更新时间2024-05-02 01:00)。数据血缘:数据的”家谱”,记录数据的”前世今生”(比如”销售额报表”的数据来自”交易表”,而”交易表”的数据来自”收银日志”)。
缩略词列表
HDFS:Hadoop分布式文件系统(超级大硬盘,能存海量文件)Spark:大数据处理引擎(多功能厨房料理机,能切菜、榨汁、搅拌)Flink:流处理框架(快速炒菜锅,能实时翻炒数据流)Kafka:消息队列(快递中转站,数据来了先放这,等处理程序来取)Hive:数据仓库工具(带标签的食材收纳盒,方便按主题找数据)AWS S3:云存储服务(亚马逊的超级冰箱,很多公司租来存数据)
核心概念与联系
故事引入:超市老板的”数据烦恼”
王老板开了家连锁超市,最近遇到了大麻烦:
数据乱糟糟:收银数据存在Excel里,库存数据存在ERP系统,会员数据存在CRM系统,想查”哪个会员买了最多牛奶”要翻三个系统,还经常对不上。分析太慢:月底想算”本月销售额Top10商品”,会计要花3天汇总数据,等结果出来,下个月都过一半了。错过机会:有次牛奶快过期了,没及时发现,最后只能亏本处理;而顾客想要的有机蔬菜经常缺货,因为库存数据更新不及时。
王老板听说”大数据架构”能解决这些问题,于是请了一位架构师设计方案。架构师画了张图,说:“我们要建一个’数据工厂’,把所有数据集中管理,让数据处理像流水线一样高效,还要能实时监控库存和销售。”
这个”数据工厂”是怎么设计的?我们接着往下看。
核心概念解释(像给小学生讲故事一样)
核心概念一:数据湖——超市的”超级大冰箱”
想象你家冰箱很小,只能放当天的菜;而超市的”超级大冰箱”(数据湖)能放1000吨食材,不管是生肉(结构化数据,如Excel表格)、生菜(半结构化数据,如JSON日志),还是整个西瓜(非结构化数据,如商品图片、视频监控),都能扔进去,而且不会坏(数据持久化存储)。
特点:
“什么都能装”:支持任意格式数据(文本、图片、视频、数据库文件)“不挑大小”:能存GB到PB级数据(1PB=100万GB,相当于20万部电影)“原汁原味”:数据存进来时不做处理,保持原始状态(就像把带泥的胡萝卜直接放进冰箱)
生活例子:超市的”原材料仓库”,大米、面粉、蔬菜、肉直接堆在里面,还没洗没切,需要时再拿出来处理。
核心概念二:数据仓库——超市的”后厨备餐区”
如果说数据湖是”原材料仓库”,那数据仓库就是”后厨备餐区”。厨师从仓库(数据湖)把胡萝卜拿出来,洗干净、切成丝(数据清洗转换),然后放在标有”胡萝卜丝”的盘子里(结构化存储),旁边还有”肉丝盘”“青菜盘”(按业务主题分类)。这样厨师做菜时,直接拿现成的备餐就能炒,不用再临时处理。
特点:
“按菜单分类”:数据按业务主题(如销售、用户、库存)组织,而不是按来源(如收银机、ERP)“已经处理好”:数据经过清洗(去重、补缺失值)、整合(关联不同来源数据)、标准化(统一格式)“方便快速取用”:优化了查询性能,支持复杂分析(如”过去3个月每个门店的牛奶销售额趋势”)
生活例子:肯德基后厨的备餐台,炸鸡块、生菜丝、面包片都分开放,员工做汉堡时直接组装,30秒就能出餐。
核心概念三:ETL/ELT——数据的”洗切流水线”
数据从数据湖到数据仓库,需要”洗切处理”,这就是ETL/ELT的工作。
ETL:传统流水线(先洗后切再上桌)
流程:从冰箱拿菜(Extract,抽取数据湖的原始数据)→ 洗菜切菜(Transform,清洗、转换数据)→ 放进备餐盘(Load,加载到数据仓库)。
例子:早上9点,从数据湖抽取昨天的100万条交易日志,过滤掉错误记录(如金额为负的),计算每个商品的销售额,然后把结果存到数据仓库的”商品销售表”。
ELT:现代流水线(先上桌再洗切)
流程:直接把菜搬进后厨(Load,把原始数据加载到数据仓库)→ 需要时再拿出来洗切(Transform,在数据仓库内部进行转换)。
为什么这么做?因为现在的数据仓库(如Snowflake、BigQuery)性能很强,能直接处理原始数据,省去了”先转换再加载”的步骤,更灵活。
例子:把100万条交易日志直接加载到数据仓库的”原始交易表”,当分析师需要”商品销售额”时,直接在数据仓库里运行SQL计算,结果临时生成。
核心概念四:批处理与流处理——数据的”烹饪方式”
数据处理就像做菜,有两种方式:
批处理:批量做菜(适合大量、非紧急的菜)
场景:超市每天关门前,要统计当天所有交易数据,计算销售额、利润。这不需要实时,第二天早上出结果就行。
特点:处理大量历史数据(一次处理GB/TB级),周期性执行(每天凌晨2点跑一次),延迟较高(几小时)。
例子:用Spark批处理引擎,每天处理前一天的1000万条交易数据,生成”日销售报表”。
流处理:现点现做(适合少量、紧急的菜)
场景:超市的”实时库存监控”,当某商品库存低于10件时,马上通知补货员。这需要实时处理,不能等第二天。
特点:处理实时产生的数据流(每秒 thousands of records),持续执行(7×24小时运行),延迟极低(毫秒到秒级)。
例子:用Flink流处理引擎,实时消费收银机产生的交易数据,每笔交易后更新商品库存,当库存<10时触发报警。
核心概念五:数据治理——数据的”食品安全监管”
就像超市需要监管食材的保质期、来源、卫生标准,数据也需要”监管”,这就是数据治理。
数据质量:确保数据”没过期、没变质”(比如交易数据中不能有”用户ID为空”的记录,销售额不能是负数)。数据安全:防止”坏人偷食材”(比如会员的手机号、身份证号需要加密存储,只有授权人员能查看)。元数据管理:给每个”食材”贴标签(记录数据的名称、来源、更新时间、负责人,比如”交易表”由数据团队张三维护,每天凌晨更新)。权限控制:谁能碰哪些”食材”(收银员只能看自己的交易数据,店长能看全店数据,实习生只能看测试数据)。
核心概念之间的关系(用小学生能理解的比喻)
这些概念不是孤立的,它们像一个”数据工厂”的不同部门,分工协作:
数据湖与数据仓库:原材料仓库 vs 备餐区
数据湖是”原材料仓库”,存所有原始数据;数据仓库是”备餐区”,存处理好的、按主题分类的数据。关系:备餐区的食材(数据仓库数据)大多来自原材料仓库(数据湖),但也可能直接从供应商(数据源)进货(比如实时交易数据直接进数据仓库)。
ETL/ELT与数据湖/数据仓库:洗切流水线 vs 仓库与备餐区
ETL/ELT是连接数据湖和数据仓库的”洗切流水线”。关系:数据湖的数据需要通过ETL/ELT处理后,才能进入数据仓库;就像原材料仓库的菜,需要洗切后才能放进备餐区。
批处理/流处理与数据仓库:厨师 vs 备餐区
批处理和流处理是”厨师”,从备餐区(数据仓库)拿处理好的食材(数据),做出”菜品”(分析结果)。关系:批处理厨师负责做大锅菜(批量报表),流处理厨师负责做小炒(实时监控),他们共用备餐区的食材。
数据治理:贯穿全程的监管员
数据治理是”监管员”,从原材料入库(数据湖)、洗切(ETL/ELT)、备餐(数据仓库)到烹饪(批/流处理),全程监督,确保每个环节都合规、安全、高质量。
核心概念原理和架构的文本示意图(专业定义)
大数据架构的整体流程可以概括为”数据从哪里来→存到哪里→怎么处理→怎么用→怎么管“,具体如下:
数据源层:数据的产生地,包括业务系统(ERP、CRM)、日志(服务器日志、APP埋点)、数据库(MySQL、Oracle)、文件(Excel、CSV)、物联网设备(传感器、监控)等。数据存储层:
原始数据存储:数据湖(HDFS、S3、ADLS),存储未经处理的所有格式数据。结构化数据存储:数据仓库(Hive、Redshift、Snowflake),存储按业务主题组织的结构化数据。缓存/消息存储:缓存(Redis)用于临时存热点数据,消息队列(Kafka)用于暂存流数据。
数据处理层:
批处理引擎:Spark、MapReduce,处理历史数据,生成报表。流处理引擎:Flink、Kafka Streams,处理实时数据,生成实时指标。ETL/ELT工具:Spark SQL、Talend、Informatica,负责数据清洗转换。
数据分析层:用户使用数据的接口,包括BI工具(Tableau、Power BI)、SQL查询引擎(Presto、Impala)、数据科学平台(Jupyter、Databricks)。数据治理层:贯穿全程的管理体系,包括元数据管理(Atlas)、数据质量监控(Great Expectations)、权限控制(Ranger)、数据安全(加密、脱敏)。
Mermaid 流程图
以下是大数据架构的核心数据流向流程图(Mermaid格式):
流程图说明:
数据源的数据分两路:原始数据进数据湖,实时数据先进Kafka消息队列数据湖通过批处理ETL将数据加工后存入数据仓库;Kafka的实时数据通过流处理ETL存入数据仓库数据仓库的数据供批处理引擎(Spark)和流处理引擎(Flink)分析批处理结果用于BI报表,流处理结果用于实时监控数据治理(元数据、质量、权限)贯穿数据湖、数据仓库和分析应用
核心技术组件解析 & 数据处理流程详解
数据湖技术:如何打造”超级大冰箱”?
数据湖的核心需求是”存得多、存得稳、能随便存”,常用技术有两类:开源分布式存储和云存储服务。
1. 开源数据湖:HDFS(Hadoop分布式文件系统)
HDFS就像”用很多小冰箱拼成的大冰箱”。假设你有10台电脑(服务器),每台硬盘1TB,HDFS能把它们变成一个”10TB的虚拟大硬盘”,而且数据会自动分成小块(默认128MB一块),存到不同电脑上,还会存备份(默认3份),即使有2台电脑坏了,数据也不会丢。
HDFS的核心组件:
NameNode:“冰箱管理员”,记录文件存在哪台电脑上(块位置信息),如果它坏了,整个HDFS就瘫了(所以通常配备用NameNode)。DataNode:“冰箱存储单元”,实际存数据块的服务器,定期向NameNode汇报”我还活着,数据没问题”。
使用场景:适合自建机房的企业,需要完全控制数据存储(如银行、电信)。
2. 云数据湖:AWS S3/阿里云OSS
云厂商的存储服务(如S3)是”租来的超级冰箱”,你不用管服务器,直接按存储量付钱(比如1GB/月0.02美元),想存多少存多少,还自带备份和容灾(数据存多个地域)。
优点:
不用维护硬件,开箱即用无限扩容(理论上能存EB级数据,1EB=10亿GB)支持多种访问方式(API、SDK、网页上传)
使用场景:中小企业或快速迭代的互联网公司(省去运维成本)。
数据仓库技术:如何设计”高效备餐区”?
数据仓库的核心是”按业务主题组织数据,支持高效查询”,常用技术分开源数据仓库和商业/云数据仓库。
1. 开源数据仓库:Hive
Hive是”基于HDFS的食材收纳盒”,它本身不存数据(数据存在HDFS),但能给HDFS上的文件建立”表结构”(就像给收纳盒贴标签),让你能用SQL查询(比如)。
SELECT * FROM sales WHERE date='2024-05-01'
Hive的特点:
适合离线分析(批处理),查询延迟较高(分钟级)支持多种文件格式(CSV、Parquet、ORC,其中Parquet/ORC是压缩格式,查得更快)可以和Spark集成,用Spark SQL加速查询
例子:在Hive里建一个”销售主题表”:
CREATE TABLE sales (
order_id STRING, -- 订单ID
user_id STRING, -- 用户ID
product_id STRING,-- 商品ID
amount DOUBLE, -- 金额
order_time STRING -- 订单时间
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -- 字段用逗号分隔
STORED AS PARQUET; -- 用Parquet格式存储(压缩率高,查询快)
2. 云数据仓库:Snowflake/BigQuery
Snowflake是”云上的智能备餐区”,它把数据存储和计算分开(存储用云对象存储,计算用弹性集群),你查数据时才启动计算资源,用完就释放,按查询量付钱。
优点:
实时性更好(查询延迟秒级)支持ELT(直接加载原始数据,在仓库内转换)弹性扩展(数据量大了自动加计算资源)
使用场景:需要快速迭代分析的企业(如电商、互联网)。
ETL/ELT实现:数据”洗切流水线”怎么搭?
ETL/ELT的核心是”数据转换逻辑”,常用工具分开源引擎和商业工具。
1. 开源ETL工具:Spark SQL
Spark是”多功能料理机”,Spark SQL则是”带食谱的料理机”,可以用SQL写ETL逻辑,也支持Java/Scala/Python代码。
批处理ETL例子(用PySpark处理交易数据):
假设数据湖(HDFS)里有原始交易日志(JSON格式),我们要清洗后加载到Hive数据仓库。
步骤1:读取数据湖的原始日志
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL").enableHiveSupport().getOrCreate()
# 读取HDFS上的JSON日志(数据湖原始数据)
raw_data = spark.read.json("hdfs:///data/lake/raw_transactions/2024-05-01/*.json")
raw_data.show(5) # 显示前5条数据
步骤2:数据清洗转换(过滤无效数据、提取字段)
# 过滤掉金额为负或订单ID为空的记录(数据清洗)
clean_data = raw_data.filter(
(raw_data.amount > 0) & # 金额必须为正
(raw_data.order_id.isNotNull()) # 订单ID不能为空
)
# 提取需要的字段,重命名并转换格式(数据转换)
transformed_data = clean_data.select(
clean_data.order_id.cast("string").alias("order_id"), # 订单ID转字符串
clean_data.user_id.alias("user_id"),
clean_data.product_id.alias("product_id"),
clean_data.amount.alias("amount"),
clean_data.timestamp.substr(0, 10).alias("order_date") # 从时间戳提取日期
)
步骤3:加载到Hive数据仓库
# 写入Hive的sales表(数据仓库),按日期分区存储(查询时可按日期过滤,更快)
transformed_data.write.mode("append").partitionBy("order_date").saveAsTable("hive_db.sales")
2. 流处理ETL工具:Flink + Kafka
流处理ETL需要”实时读取数据流,实时转换,实时写入目标”,通常用Kafka接收实时数据,Flink处理。
流处理ETL例子(用Flink实时处理订单数据):
场景:实时接收收银机的订单数据(JSON格式),清洗后写入数据仓库的”实时订单表”。
步骤1:Flink连接Kafka接收数据
// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置Kafka连接信息
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka-broker:9092"); // Kafka地址
kafkaProps.setProperty("group.id", "order-consumer"); // 消费者组ID
// 从Kafka的"orders"主题读取数据
DataStream<String> kafkaStream = env.addSource(
new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), kafkaProps)
);
步骤2:实时解析和清洗数据
// 解析JSON数据为Order对象(自定义Java类)
DataStream<Order> orderStream = kafkaStream
.map(jsonStr -> {
// 用FastJSON解析JSON字符串
JSONObject json = JSON.parseObject(jsonStr);
return new Order(
json.getString("order_id"),
json.getString("user_id"),
json.getString("product_id"),
json.getDouble("amount"),
json.getString("timestamp")
);
})
.filter(order -> order.getAmount() > 0 && order.getOrderId() != null); // 过滤无效订单
步骤3:实时写入数据仓库(如Hive)
// 用Flink的JDBC连接器写入Hive(需要Hive支持ACID事务)
orderStream.addSink(
JdbcSink.sink(
"INSERT INTO realtime_orders (order_id, user_id, product_id, amount, timestamp) VALUES (?, ?, ?, ?, ?)",
(ps, order) -> {
ps.setString(1, order.getOrderId());
ps.setString(2, order.getUserId());
ps.setString(3, order.getProductId());
ps.setDouble(4, order.getAmount());
ps.setString(5, order.getTimestamp());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:hive2://hive-server:10000/default")
.withDriverName("org.apache.hive.jdbc.HiveDriver")
.withUsername("hiveuser")
.withPassword("hivepass")
.build()
)
);
// 执行Flink作业
env.execute("Real-time Order ETL");
批处理与流处理:两种”烹饪方式”的实战对比
批处理实战:用Spark计算”商品月销售额Top10″
场景:每月1号计算上月所有商品的销售额,生成报表。
数据量:约1亿条交易记录(10GB数据)
技术栈:Spark + Hive
实现步骤:
从Hive数据仓库读取上月交易数据按商品ID分组,汇总销售额按销售额降序排序,取Top10结果写入报表表
核心代码(PySpark):
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, desc
spark = SparkSession.builder.appName("MonthlySalesTop10").enableHiveSupport().getOrCreate()
# 读取Hive中的销售数据(按上月分区过滤)
last_month = "2024-04" # 实际中可通过代码动态获取上月
sales_data = spark.sql(f"""
SELECT product_id, amount
FROM hive_db.sales
WHERE order_date LIKE '{last_month}%' # 过滤上月数据
""")
# 按商品ID汇总销售额
product_sales = sales_data.groupBy("product_id")
.agg(sum("amount").alias("total_sales"))
# 排序取Top10
top10_products = product_sales.orderBy(desc("total_sales")).limit(10)
# 写入报表表
top10_products.write.mode("overwrite").saveAsTable("hive_db.monthly_sales_top10")
spark.stop()
运行方式:通过Linux cron任务每月1号凌晨2点执行:
0 2 1 * * /usr/local/spark/bin/spark-submit --master yarn monthly_sales_top10.py
流处理实战:用Flink实时计算”商品实时销售额”
场景:实时监控商品销售额,每5秒更新一次,展示在大屏上。
数据量:每秒约1000条订单数据
技术栈:Flink + Kafka + Redis
实现步骤:
从Kafka接收实时订单数据按商品ID分组,每5秒汇总一次销售额(滚动窗口)将结果存入Redis(供大屏查询)
核心代码(Java):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 使用事件时间(订单实际发生时间)
// 1. 从Kafka读取订单数据
DataStream<Order> orderStream = env.addSource(kafkaConsumer)
.map(jsonStr -> JSON.parseObject(jsonStr, Order.class))
.assignTimestampsAndWatermarks( // 提取事件时间,处理数据延迟
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((order, timestamp) ->
LocalDateTime.parse(order.getTimestamp(), DateTimeFormatter.ISO_DATE_TIME)
.toInstant(ZoneOffset.UTC).toEpochMilli()
)
);
// 2. 按商品ID分组,5秒窗口汇总销售额
DataStream<Tuple2<String, Double>> productSalesStream = orderStream
.keyBy(Order::getProductId) // 按商品ID分组
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口
.aggregate(new AggregateFunction<Order, Double, Double>() {
@Override
public Double createAccumulator() { return 0.0; } // 初始累加器(销售额0)
@Override
public Double add(Order order, Double acc) { return acc + order.getAmount(); } // 累加金额
@Override
public Double getResult(Double acc) { return acc; } // 窗口结束时返回结果
@Override
public Double merge(Double a, Double b) { return a + b; } // 合并累加器(分布式场景)
})
.map(result -> new Tuple2<>(result.f0, result.f1)); // (product_id, total_sales)
// 3. 写入Redis(Key: product:{product_id}, Value: total_sales)
productSalesStream.addSink(new RedisSink<>(redisConfig, new RedisMapper<Tuple2<String, Double>>() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET); // 使用SET命令
}
@Override
public String getKeyFromData(Tuple2<String, Double> data) {
return "product:" + data.f0; // Key: product:12345
}
@Override
public String getValueFromData(Tuple2<String, Double> data) {
return data.f1.toString(); // Value: 1250.5
}
}));
env.execute("Real-time Product Sales");
效果:大屏通过Redis API(如)每5秒获取一次数据,实时展示各商品销售额。
GET product:12345
项目实战:电商大数据架构设计与实现
项目背景
我们要为一家中型电商公司设计大数据架构,需求如下:
数据来源:订单系统(MySQL)、用户行为日志(APP埋点)、商品信息(MongoDB)、物流数据(API接口)数据需求:
离线分析:日/月销售报表、用户画像、商品推荐模型训练实时分析:实时订单监控、库存预警、首页热门商品排行
技术约束:预算有限,优先选择开源组件
架构设计
基于需求,我们设计”湖仓一体+批流融合“架构,如下:
(注:实际写作时可插入架构图,此处用文字描述)
组件说明:
数据湖:HDFS(存原始日志、爬虫数据等非结构化数据)数据仓库:Hive(存结构化业务数据,如订单表、用户表)实时数据通道:Kafka(接收APP埋点、实时订单数据)批处理引擎:Spark(跑离线ETL和报表计算)流处理引擎:Flink(处理实时数据,更新库存和热门商品)元数据管理:Atlas(记录数据血缘和表结构)BI工具:Superset(制作销售报表和仪表盘)
开发环境搭建(Docker Compose快速部署)
为了快速开发,我们用Docker Compose部署所有组件:
docker-compose.yml(核心部分):
version: '3'
services:
# HDFS(数据湖)
hdfs-namenode:
image: apache/hadoop:3.3.4
command: namenode
environment:
- HDFS_NAMENODE_USER=root
- HDFS_DATANODE_USER=root
ports:
- "9870:9870" # HDFS Web界面
hdfs-datanode:
image: apache/hadoop:3.3.4
command: datanode
environment:
- HDFS_NAMENODE_USER=root
- HDFS_DATANODE_USER=root
depends_on:
- hdfs-namenode
# Kafka(消息队列)
kafka:
image: confluentinc/cp-kafka:7.3.0
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
# Hive(数据仓库)
hive-server:
image: apache/hive:3.1.3
ports:
- "10000:10000" # Hive JDBC端口
depends_on:
- hdfs-namenode
# Spark(批处理)
spark:
image: bitnami/spark:3.3.2
ports:
- "8080:8080" # Spark Master Web界面
environment:
- SPARK_MODE=master
# Flink(流处理)
flink-jobmanager:
image: flink:1.16.0
ports:
- "8081:8081" # Flink Web界面
command: jobmanager
flink-taskmanager:
image: flink:1.16.0
depends_on:
- flink-jobmanager
command: taskmanager
启动命令:
docker-compose up -d # 后台启动所有服务
核心模块实现
模块1:数据采集(从数据源到数据湖/ Kafka)
1.1 MySQL订单数据采集(批处理)
用Sqoop工具(开源数据迁移工具)每天凌晨从MySQL全量导出订单数据到HDFS数据湖:
# Sqoop命令:从MySQL导出order表到HDFS
sqoop export
--connect jdbc:mysql://mysql-host:3306/ecommerce
--username root
--password password
--table order
--target-dir /data/lake/mysql/order/$(date +%Y-%m-%d) # 按日期分区存储
--fields-terminated-by ',' # 字段分隔符
--m 1 # 1个Map任务(数据量小时用)
1.2 APP用户行为日志采集(流处理)
APP埋点日志通过HTTP发送到Flume(日志收集工具),Flume再将日志写入Kafka:
Flume配置文件(user_behavior.conf):
agent.sources = http-source
agent.sinks = kafka-sink
agent.channels = memory-channel
# 源:HTTP接收日志(APP发送POST请求到该端口)
agent.sources.http-source.type = http
agent.sources.http-source.bind = 0.0.0.0
agent.sources.http-source.port = 5140
# 通道:内存缓存日志
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000
# sink:发送到Kafka的user_behavior主题
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.bootstrap.servers = kafka:9092
agent.sinks.kafka-sink.kafka.topic = user_behavior
agent.sinks.kafka-sink.serializer.class = kafka.serializer.StringEncoder
# 连接源、通道、sink
agent.sources.http-source.channels = memory-channel
agent.sinks.kafka-sink.channel = memory-channel
启动Flume:
flume-ng agent -n agent -f user_behavior.conf -Dflume.root.logger=INFO,console
模块2:数据处理(ETL + 批/流分析)
2.1 批处理ETL(Spark + Hive)
将HDFS数据湖的原始订单数据清洗后加载到Hive数据仓库:
Spark ETL脚本(order_etl.py):
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
spark = SparkSession.builder.appName("OrderETL").enableHiveSupport().getOrCreate()
# 1. 读取HDFS上的原始订单数据(昨天的)
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
raw_order_path = f"hdfs:///data/lake/mysql/order/{yesterday}"
raw_orders = spark.read.csv(
raw_order_path,
header=True, # 第一行为表头
inferSchema=True, # 自动推断 schema
sep=','
)
# 2. 数据清洗
clean_orders = raw_orders.filter(
(col("order_status") == "PAID") & # 只保留已支付订单
(col("amount") > 0) & # 金额为正
(col("user_id").isNotNull())
).select(
col("order_id"),
col("user_id"),
col("product_id"),
col("amount"),
to_date(col("create_time"), "yyyy-MM-dd HH:mm:ss").alias("order_date") # 转换日期格式
)
# 3. 写入Hive数据仓库(按日期分区)
clean_orders.write.mode("append").partitionBy("order_date").saveAsTable("ecommerce.order")
spark.stop()
2.2 实时热门商品计算(Flink + Kafka + Redis)
从Kafka的user_behavior主题读取用户点击日志,实时计算”最近1小时热门商品Top10″,存入Redis供首页展示:
Flink作业(HotProducts.java):
public class HotProducts {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 1. 从Kafka读取用户行为数据(JSON格式:{"user_id": "123", "product_id": "456", "behavior": "click", "timestamp": "2024-05-01 10:00:00"})
DataStream<String> behaviorStream = env.addSource(
new FlinkKafkaConsumer<>("user_behavior", new SimpleStringSchema(), kafkaProps)
);
// 2. 解析并过滤点击行为
DataStream<ProductView> viewStream = behaviorStream
.map(jsonStr -> JSON.parseObject(jsonStr))
.filter(json -> "click".equals(json.getString("behavior"))) // 只保留点击行为
.map(json -> new ProductView(
json.getString("product_id"),
LocalDateTime.parse(json.getString("timestamp"), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
.toInstant(ZoneOffset.UTC).toEpochMilli()
))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<ProductView>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((view, ts) -> view.getTimestamp())
);
// 3. 开窗统计(1小时窗口,每5分钟更新一次)
DataStream<Tuple2<String, Long>> hotProductsStream = viewStream
.keyBy(ProductView::getProductId)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5))) // 滑动窗口:1小时窗口,5分钟滑动一次
.aggregate(new CountAgg(), new WindowResultFunction()); // 统计每个窗口内的点击数
// 4. 按窗口分组,取Top10
DataStream<String> top10Stream = hotProductsStream
.keyBy(window -> window.f0) // 按窗口结束时间分组
.process(new TopNHotProducts(10)); // 自定义ProcessFunction取Top10
// 5. 写入Redis(Key: hot_products:last_hour, Value: JSON数组)
top10Stream.addSink(new RedisSink<>(redisConfig, new HotProductsRedisMapper()));
env.execute("Hot Products Analysis");
}
}
模块3:数据分析与应用
3.1 离线报表(Hive + Superset)
在Superset中连接Hive,创建”日销售报表”仪表盘,包含:
当日总销售额、订单量、客单价各品类销售额占比饼图销售额TOP10商品柱状图
Hive查询示例(日销售额):
SELECT
order_date,
SUM(amount) AS total_sales,
COUNT(DISTINCT order_id) AS order_count,
SUM(amount)/COUNT(DISTINCT order_id) AS avg_price
FROM ecommerce.order
WHERE order_date = CURRENT_DATE - INTERVAL '1' DAY
GROUP BY order_date;
3.2 实时大屏(Redis + Vue)
前端Vue项目通过Axios每秒请求Redis中的和
hot_products:last_hour数据,用ECharts绘制实时热门商品排行和销售额趋势图。
realtime_sales
数据治理实现
1. 元数据管理(Atlas)
在Atlas中注册Hive表,记录:
表名:所有者:data_team@company.com数据来源:MySQL订单表分区字段:order_date血缘关系:
ecommerce.order →
hdfs:///data/lake/mysql/order
ecommerce.order
2. 数据质量监控(Great Expectations)
编写数据质量规则(Expectation Suite),每天检查订单表:
expectations:
- expectation_type: expect_column_values_to_not_be_null
kwargs:
column: order_id
- expectation_type: expect_column_values_to_be_between
kwargs:
column: amount
min_value: 0
max_value: 100000
3. 权限控制(Ranger)


















暂无评论内容