大数据领域数据架构的关键技术解析

大数据领域数据架构的关键技术解析

关键词:大数据架构、数据湖、数据仓库、ETL/ELT、流处理、批处理、数据治理

摘要:在数字时代,数据已成为企业的核心资产,而大数据架构则是”管理数据资产的超级工厂”。本文将以”小学生能听懂的故事”为风格,从数据架构的”前世今生”讲起,逐步解析数据湖、数据仓库、ETL/ELT、流处理、批处理等核心组件的原理,通过”超市数据管理”的类比让复杂概念变得直观。我们会深入拆解数据从产生到应用的全流程,用实际代码案例展示批处理与流处理的实现,并通过电商数据架构实战项目,带你亲手搭建一个”数据工厂”。最后,我们还会探讨湖仓一体、实时数仓等前沿趋势,帮你全面掌握大数据架构的”设计密码”。

背景介绍

目的和范围

想象你经营着一家超级大超市,每天有10万个顾客购物,产生100万条交易记录、50万条商品库存变动、20万条会员行为数据……这些数据如果散乱堆放,就像把食材、工具、账本全扔在地上,根本无法管理。大数据架构就是为这样的”数据超市”设计的”超级仓库+智能流水线”,让数据从”杂乱的原材料”变成”可直接使用的商品”。

本文的目的是:

揭开大数据架构的”神秘面纱”,用生活例子解释核心技术讲清数据湖、数据仓库等组件的”分工”和”协作方式”手把手教你用代码实现数据处理流程(批处理+流处理)通过实战项目理解”如何设计一个能用的大数据架构”

范围覆盖:数据架构的核心组件、数据处理全流程、典型技术工具、实战案例及未来趋势,不涉及过于底层的硬件优化(如服务器集群部署细节)。

预期读者

刚接触大数据的程序员/学生(想入门数据架构)数据分析师/产品经理(想理解数据从哪来到哪去)传统IT从业者(想转型大数据领域)对”数据如何变成价值”好奇的任何人

文档结构概述

本文就像”建造数据工厂”的说明书,分7个部分:

打地基:背景介绍与术语表(先认识工具和材料)画图纸:核心概念与架构设计(设计工厂的布局)建流水线:数据处理流程详解(批处理+流处理)开工实战:电商数据架构项目(亲手建一个小工厂)看应用:不同行业的架构案例(参观别人的工厂)选工具:常用技术栈与资源推荐(采购合适的设备)望未来:趋势与挑战(工厂如何升级)

术语表

核心术语定义
术语 小学生版解释 专业版解释
数据湖 存放”生食材”的超级冰箱,什么都能扔进去(生肉、生菜、水果),不管格式 存储原始、未经处理的海量数据(结构化、半结构化、非结构化)的集中式存储系统,支持任意格式数据的存储和直接访问
数据仓库 存放”熟食材”的后厨,食材已经洗好、切好、分类放(肉丝放一盒、青菜放一格),方便厨师快速做菜 面向分析场景,存储经过清洗、整合、结构化处理的数据,按业务主题(如销售、用户、库存)组织,支持高效查询和报表分析
ETL 洗菜切菜流水线:先把菜从冰箱(数据湖)拿出来(Extract),洗干净、切小块(Transform),再放进后厨的盘子里(Load) 数据抽取(Extract)、转换(Transform)、加载(Load)的过程,将分散的原始数据经过处理后加载到目标存储(如数据仓库)
ELT 先把菜直接搬进后厨(Load),需要做菜时再洗切(Transform) 数据加载(Load)、抽取(Extract)、转换(Transform)的过程,先将原始数据加载到目标存储,再在目标端进行转换处理(通常用于数据仓库性能较强的场景)
批处理 每天凌晨集中处理前一天的所有食材(比如晚上收集的1000斤菜,早上一起洗切) 对大量历史数据(通常是GB/TB级)进行周期性(如每天/每周)集中处理的方式,处理延迟较高(分钟到小时级)
流处理 客人点单后马上做菜(点一份做一份,实时上桌) 对实时产生的数据流(如每秒 thousands of records)进行即时处理的方式,处理延迟极低(毫秒到秒级)
数据治理 超市的”食品安全监管员”,确保食材没过期、来源合法、操作规范 对数据全生命周期(采集、存储、处理、使用)进行管理的体系,包括数据质量、数据安全、元数据管理、权限控制等
相关概念解释

数据源:数据的”老家”,比如超市的收银机(交易数据)、会员卡读卡器(用户数据)、仓库扫码枪(库存数据)。元数据:数据的”身份证”,记录数据叫什么名字、从哪来、什么时候更新、谁能看(比如”20240501_sales.csv”是5月1日的交易数据,来自收银系统,更新时间2024-05-02 01:00)。数据血缘:数据的”家谱”,记录数据的”前世今生”(比如”销售额报表”的数据来自”交易表”,而”交易表”的数据来自”收银日志”)。

缩略词列表

HDFS:Hadoop分布式文件系统(超级大硬盘,能存海量文件)Spark:大数据处理引擎(多功能厨房料理机,能切菜、榨汁、搅拌)Flink:流处理框架(快速炒菜锅,能实时翻炒数据流)Kafka:消息队列(快递中转站,数据来了先放这,等处理程序来取)Hive:数据仓库工具(带标签的食材收纳盒,方便按主题找数据)AWS S3:云存储服务(亚马逊的超级冰箱,很多公司租来存数据)

核心概念与联系

故事引入:超市老板的”数据烦恼”

王老板开了家连锁超市,最近遇到了大麻烦:

数据乱糟糟:收银数据存在Excel里,库存数据存在ERP系统,会员数据存在CRM系统,想查”哪个会员买了最多牛奶”要翻三个系统,还经常对不上。分析太慢:月底想算”本月销售额Top10商品”,会计要花3天汇总数据,等结果出来,下个月都过一半了。错过机会:有次牛奶快过期了,没及时发现,最后只能亏本处理;而顾客想要的有机蔬菜经常缺货,因为库存数据更新不及时。

王老板听说”大数据架构”能解决这些问题,于是请了一位架构师设计方案。架构师画了张图,说:“我们要建一个’数据工厂’,把所有数据集中管理,让数据处理像流水线一样高效,还要能实时监控库存和销售。”

这个”数据工厂”是怎么设计的?我们接着往下看。

核心概念解释(像给小学生讲故事一样)

核心概念一:数据湖——超市的”超级大冰箱”

想象你家冰箱很小,只能放当天的菜;而超市的”超级大冰箱”(数据湖)能放1000吨食材,不管是生肉(结构化数据,如Excel表格)、生菜(半结构化数据,如JSON日志),还是整个西瓜(非结构化数据,如商品图片、视频监控),都能扔进去,而且不会坏(数据持久化存储)。

特点

“什么都能装”:支持任意格式数据(文本、图片、视频、数据库文件)“不挑大小”:能存GB到PB级数据(1PB=100万GB,相当于20万部电影)“原汁原味”:数据存进来时不做处理,保持原始状态(就像把带泥的胡萝卜直接放进冰箱)

生活例子:超市的”原材料仓库”,大米、面粉、蔬菜、肉直接堆在里面,还没洗没切,需要时再拿出来处理。

核心概念二:数据仓库——超市的”后厨备餐区”

如果说数据湖是”原材料仓库”,那数据仓库就是”后厨备餐区”。厨师从仓库(数据湖)把胡萝卜拿出来,洗干净、切成丝(数据清洗转换),然后放在标有”胡萝卜丝”的盘子里(结构化存储),旁边还有”肉丝盘”“青菜盘”(按业务主题分类)。这样厨师做菜时,直接拿现成的备餐就能炒,不用再临时处理。

特点

“按菜单分类”:数据按业务主题(如销售、用户、库存)组织,而不是按来源(如收银机、ERP)“已经处理好”:数据经过清洗(去重、补缺失值)、整合(关联不同来源数据)、标准化(统一格式)“方便快速取用”:优化了查询性能,支持复杂分析(如”过去3个月每个门店的牛奶销售额趋势”)

生活例子:肯德基后厨的备餐台,炸鸡块、生菜丝、面包片都分开放,员工做汉堡时直接组装,30秒就能出餐。

核心概念三:ETL/ELT——数据的”洗切流水线”

数据从数据湖到数据仓库,需要”洗切处理”,这就是ETL/ELT的工作。

ETL:传统流水线(先洗后切再上桌)
流程:从冰箱拿菜(Extract,抽取数据湖的原始数据)→ 洗菜切菜(Transform,清洗、转换数据)→ 放进备餐盘(Load,加载到数据仓库)。
例子:早上9点,从数据湖抽取昨天的100万条交易日志,过滤掉错误记录(如金额为负的),计算每个商品的销售额,然后把结果存到数据仓库的”商品销售表”。

ELT:现代流水线(先上桌再洗切)
流程:直接把菜搬进后厨(Load,把原始数据加载到数据仓库)→ 需要时再拿出来洗切(Transform,在数据仓库内部进行转换)。
为什么这么做?因为现在的数据仓库(如Snowflake、BigQuery)性能很强,能直接处理原始数据,省去了”先转换再加载”的步骤,更灵活。
例子:把100万条交易日志直接加载到数据仓库的”原始交易表”,当分析师需要”商品销售额”时,直接在数据仓库里运行SQL计算,结果临时生成。

核心概念四:批处理与流处理——数据的”烹饪方式”

数据处理就像做菜,有两种方式:

批处理:批量做菜(适合大量、非紧急的菜)
场景:超市每天关门前,要统计当天所有交易数据,计算销售额、利润。这不需要实时,第二天早上出结果就行。
特点:处理大量历史数据(一次处理GB/TB级),周期性执行(每天凌晨2点跑一次),延迟较高(几小时)。
例子:用Spark批处理引擎,每天处理前一天的1000万条交易数据,生成”日销售报表”。

流处理:现点现做(适合少量、紧急的菜)
场景:超市的”实时库存监控”,当某商品库存低于10件时,马上通知补货员。这需要实时处理,不能等第二天。
特点:处理实时产生的数据流(每秒 thousands of records),持续执行(7×24小时运行),延迟极低(毫秒到秒级)。
例子:用Flink流处理引擎,实时消费收银机产生的交易数据,每笔交易后更新商品库存,当库存<10时触发报警。

核心概念五:数据治理——数据的”食品安全监管”

就像超市需要监管食材的保质期、来源、卫生标准,数据也需要”监管”,这就是数据治理。

数据质量:确保数据”没过期、没变质”(比如交易数据中不能有”用户ID为空”的记录,销售额不能是负数)。数据安全:防止”坏人偷食材”(比如会员的手机号、身份证号需要加密存储,只有授权人员能查看)。元数据管理:给每个”食材”贴标签(记录数据的名称、来源、更新时间、负责人,比如”交易表”由数据团队张三维护,每天凌晨更新)。权限控制:谁能碰哪些”食材”(收银员只能看自己的交易数据,店长能看全店数据,实习生只能看测试数据)。

核心概念之间的关系(用小学生能理解的比喻)

这些概念不是孤立的,它们像一个”数据工厂”的不同部门,分工协作:

数据湖与数据仓库:原材料仓库 vs 备餐区

数据湖是”原材料仓库”,存所有原始数据;数据仓库是”备餐区”,存处理好的、按主题分类的数据。关系:备餐区的食材(数据仓库数据)大多来自原材料仓库(数据湖),但也可能直接从供应商(数据源)进货(比如实时交易数据直接进数据仓库)。

ETL/ELT与数据湖/数据仓库:洗切流水线 vs 仓库与备餐区

ETL/ELT是连接数据湖和数据仓库的”洗切流水线”。关系:数据湖的数据需要通过ETL/ELT处理后,才能进入数据仓库;就像原材料仓库的菜,需要洗切后才能放进备餐区。

批处理/流处理与数据仓库:厨师 vs 备餐区

批处理和流处理是”厨师”,从备餐区(数据仓库)拿处理好的食材(数据),做出”菜品”(分析结果)。关系:批处理厨师负责做大锅菜(批量报表),流处理厨师负责做小炒(实时监控),他们共用备餐区的食材。

数据治理:贯穿全程的监管员

数据治理是”监管员”,从原材料入库(数据湖)、洗切(ETL/ELT)、备餐(数据仓库)到烹饪(批/流处理),全程监督,确保每个环节都合规、安全、高质量。

核心概念原理和架构的文本示意图(专业定义)

大数据架构的整体流程可以概括为”数据从哪里来→存到哪里→怎么处理→怎么用→怎么管“,具体如下:

数据源层:数据的产生地,包括业务系统(ERP、CRM)、日志(服务器日志、APP埋点)、数据库(MySQL、Oracle)、文件(Excel、CSV)、物联网设备(传感器、监控)等。数据存储层
原始数据存储:数据湖(HDFS、S3、ADLS),存储未经处理的所有格式数据。结构化数据存储:数据仓库(Hive、Redshift、Snowflake),存储按业务主题组织的结构化数据。缓存/消息存储:缓存(Redis)用于临时存热点数据,消息队列(Kafka)用于暂存流数据。
数据处理层
批处理引擎:Spark、MapReduce,处理历史数据,生成报表。流处理引擎:Flink、Kafka Streams,处理实时数据,生成实时指标。ETL/ELT工具:Spark SQL、Talend、Informatica,负责数据清洗转换。
数据分析层:用户使用数据的接口,包括BI工具(Tableau、Power BI)、SQL查询引擎(Presto、Impala)、数据科学平台(Jupyter、Databricks)。数据治理层:贯穿全程的管理体系,包括元数据管理(Atlas)、数据质量监控(Great Expectations)、权限控制(Ranger)、数据安全(加密、脱敏)。

Mermaid 流程图

以下是大数据架构的核心数据流向流程图(Mermaid格式):

流程图说明

数据源的数据分两路:原始数据进数据湖,实时数据先进Kafka消息队列数据湖通过批处理ETL将数据加工后存入数据仓库;Kafka的实时数据通过流处理ETL存入数据仓库数据仓库的数据供批处理引擎(Spark)和流处理引擎(Flink)分析批处理结果用于BI报表,流处理结果用于实时监控数据治理(元数据、质量、权限)贯穿数据湖、数据仓库和分析应用

核心技术组件解析 & 数据处理流程详解

数据湖技术:如何打造”超级大冰箱”?

数据湖的核心需求是”存得多、存得稳、能随便存”,常用技术有两类:开源分布式存储云存储服务

1. 开源数据湖:HDFS(Hadoop分布式文件系统)

HDFS就像”用很多小冰箱拼成的大冰箱”。假设你有10台电脑(服务器),每台硬盘1TB,HDFS能把它们变成一个”10TB的虚拟大硬盘”,而且数据会自动分成小块(默认128MB一块),存到不同电脑上,还会存备份(默认3份),即使有2台电脑坏了,数据也不会丢。

HDFS的核心组件

NameNode:“冰箱管理员”,记录文件存在哪台电脑上(块位置信息),如果它坏了,整个HDFS就瘫了(所以通常配备用NameNode)。DataNode:“冰箱存储单元”,实际存数据块的服务器,定期向NameNode汇报”我还活着,数据没问题”。

使用场景:适合自建机房的企业,需要完全控制数据存储(如银行、电信)。

2. 云数据湖:AWS S3/阿里云OSS

云厂商的存储服务(如S3)是”租来的超级冰箱”,你不用管服务器,直接按存储量付钱(比如1GB/月0.02美元),想存多少存多少,还自带备份和容灾(数据存多个地域)。

优点

不用维护硬件,开箱即用无限扩容(理论上能存EB级数据,1EB=10亿GB)支持多种访问方式(API、SDK、网页上传)

使用场景:中小企业或快速迭代的互联网公司(省去运维成本)。

数据仓库技术:如何设计”高效备餐区”?

数据仓库的核心是”按业务主题组织数据,支持高效查询”,常用技术分开源数据仓库商业/云数据仓库

1. 开源数据仓库:Hive

Hive是”基于HDFS的食材收纳盒”,它本身不存数据(数据存在HDFS),但能给HDFS上的文件建立”表结构”(就像给收纳盒贴标签),让你能用SQL查询(比如
SELECT * FROM sales WHERE date='2024-05-01'
)。

Hive的特点

适合离线分析(批处理),查询延迟较高(分钟级)支持多种文件格式(CSV、Parquet、ORC,其中Parquet/ORC是压缩格式,查得更快)可以和Spark集成,用Spark SQL加速查询

例子:在Hive里建一个”销售主题表”:


CREATE TABLE sales (
    order_id STRING,  -- 订单ID
    user_id STRING,   -- 用户ID
    product_id STRING,-- 商品ID
    amount DOUBLE,    -- 金额
    order_time STRING -- 订单时间
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','  -- 字段用逗号分隔
STORED AS PARQUET;  -- 用Parquet格式存储(压缩率高,查询快)
2. 云数据仓库:Snowflake/BigQuery

Snowflake是”云上的智能备餐区”,它把数据存储和计算分开(存储用云对象存储,计算用弹性集群),你查数据时才启动计算资源,用完就释放,按查询量付钱。

优点

实时性更好(查询延迟秒级)支持ELT(直接加载原始数据,在仓库内转换)弹性扩展(数据量大了自动加计算资源)

使用场景:需要快速迭代分析的企业(如电商、互联网)。

ETL/ELT实现:数据”洗切流水线”怎么搭?

ETL/ELT的核心是”数据转换逻辑”,常用工具分开源引擎商业工具

1. 开源ETL工具:Spark SQL

Spark是”多功能料理机”,Spark SQL则是”带食谱的料理机”,可以用SQL写ETL逻辑,也支持Java/Scala/Python代码。

批处理ETL例子(用PySpark处理交易数据):
假设数据湖(HDFS)里有原始交易日志(JSON格式),我们要清洗后加载到Hive数据仓库。

步骤1:读取数据湖的原始日志


from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETL").enableHiveSupport().getOrCreate()

# 读取HDFS上的JSON日志(数据湖原始数据)
raw_data = spark.read.json("hdfs:///data/lake/raw_transactions/2024-05-01/*.json")
raw_data.show(5)  # 显示前5条数据

步骤2:数据清洗转换(过滤无效数据、提取字段)


# 过滤掉金额为负或订单ID为空的记录(数据清洗)
clean_data = raw_data.filter(
    (raw_data.amount > 0) &  # 金额必须为正
    (raw_data.order_id.isNotNull())  # 订单ID不能为空
)

# 提取需要的字段,重命名并转换格式(数据转换)
transformed_data = clean_data.select(
    clean_data.order_id.cast("string").alias("order_id"),  # 订单ID转字符串
    clean_data.user_id.alias("user_id"),
    clean_data.product_id.alias("product_id"),
    clean_data.amount.alias("amount"),
    clean_data.timestamp.substr(0, 10).alias("order_date")  # 从时间戳提取日期
)

步骤3:加载到Hive数据仓库


# 写入Hive的sales表(数据仓库),按日期分区存储(查询时可按日期过滤,更快)
transformed_data.write.mode("append").partitionBy("order_date").saveAsTable("hive_db.sales")
2. 流处理ETL工具:Flink + Kafka

流处理ETL需要”实时读取数据流,实时转换,实时写入目标”,通常用Kafka接收实时数据,Flink处理。

流处理ETL例子(用Flink实时处理订单数据):
场景:实时接收收银机的订单数据(JSON格式),清洗后写入数据仓库的”实时订单表”。

步骤1:Flink连接Kafka接收数据


// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 配置Kafka连接信息
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka-broker:9092");  // Kafka地址
kafkaProps.setProperty("group.id", "order-consumer");  // 消费者组ID

// 从Kafka的"orders"主题读取数据
DataStream<String> kafkaStream = env.addSource(
    new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), kafkaProps)
);

步骤2:实时解析和清洗数据


// 解析JSON数据为Order对象(自定义Java类)
DataStream<Order> orderStream = kafkaStream
    .map(jsonStr -> {
        // 用FastJSON解析JSON字符串
        JSONObject json = JSON.parseObject(jsonStr);
        return new Order(
            json.getString("order_id"),
            json.getString("user_id"),
            json.getString("product_id"),
            json.getDouble("amount"),
            json.getString("timestamp")
        );
    })
    .filter(order -> order.getAmount() > 0 && order.getOrderId() != null);  // 过滤无效订单

步骤3:实时写入数据仓库(如Hive)


// 用Flink的JDBC连接器写入Hive(需要Hive支持ACID事务)
orderStream.addSink(
    JdbcSink.sink(
        "INSERT INTO realtime_orders (order_id, user_id, product_id, amount, timestamp) VALUES (?, ?, ?, ?, ?)",
        (ps, order) -> {
            ps.setString(1, order.getOrderId());
            ps.setString(2, order.getUserId());
            ps.setString(3, order.getProductId());
            ps.setDouble(4, order.getAmount());
            ps.setString(5, order.getTimestamp());
        },
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withUrl("jdbc:hive2://hive-server:10000/default")
            .withDriverName("org.apache.hive.jdbc.HiveDriver")
            .withUsername("hiveuser")
            .withPassword("hivepass")
            .build()
    )
);

// 执行Flink作业
env.execute("Real-time Order ETL");

批处理与流处理:两种”烹饪方式”的实战对比

批处理实战:用Spark计算”商品月销售额Top10″

场景:每月1号计算上月所有商品的销售额,生成报表。
数据量:约1亿条交易记录(10GB数据)
技术栈:Spark + Hive

实现步骤

从Hive数据仓库读取上月交易数据按商品ID分组,汇总销售额按销售额降序排序,取Top10结果写入报表表

核心代码(PySpark)


from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, desc

spark = SparkSession.builder.appName("MonthlySalesTop10").enableHiveSupport().getOrCreate()

# 读取Hive中的销售数据(按上月分区过滤)
last_month = "2024-04"  # 实际中可通过代码动态获取上月
sales_data = spark.sql(f"""
    SELECT product_id, amount 
    FROM hive_db.sales 
    WHERE order_date LIKE '{last_month}%'  # 过滤上月数据
""")

# 按商品ID汇总销售额
product_sales = sales_data.groupBy("product_id") 
                          .agg(sum("amount").alias("total_sales"))

# 排序取Top10
top10_products = product_sales.orderBy(desc("total_sales")).limit(10)

# 写入报表表
top10_products.write.mode("overwrite").saveAsTable("hive_db.monthly_sales_top10")

spark.stop()

运行方式:通过Linux cron任务每月1号凌晨2点执行:


0 2 1 * * /usr/local/spark/bin/spark-submit --master yarn monthly_sales_top10.py
流处理实战:用Flink实时计算”商品实时销售额”

场景:实时监控商品销售额,每5秒更新一次,展示在大屏上。
数据量:每秒约1000条订单数据
技术栈:Flink + Kafka + Redis

实现步骤

从Kafka接收实时订单数据按商品ID分组,每5秒汇总一次销售额(滚动窗口)将结果存入Redis(供大屏查询)

核心代码(Java)


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  // 使用事件时间(订单实际发生时间)

// 1. 从Kafka读取订单数据
DataStream<Order> orderStream = env.addSource(kafkaConsumer)
    .map(jsonStr -> JSON.parseObject(jsonStr, Order.class))
    .assignTimestampsAndWatermarks(  // 提取事件时间,处理数据延迟
        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((order, timestamp) -> 
                LocalDateTime.parse(order.getTimestamp(), DateTimeFormatter.ISO_DATE_TIME)
                    .toInstant(ZoneOffset.UTC).toEpochMilli()
            )
    );

// 2. 按商品ID分组,5秒窗口汇总销售额
DataStream<Tuple2<String, Double>> productSalesStream = orderStream
    .keyBy(Order::getProductId)  // 按商品ID分组
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))  // 5秒滚动窗口
    .aggregate(new AggregateFunction<Order, Double, Double>() {
        @Override
        public Double createAccumulator() { return 0.0; }  // 初始累加器(销售额0)
        @Override
        public Double add(Order order, Double acc) { return acc + order.getAmount(); }  // 累加金额
        @Override
        public Double getResult(Double acc) { return acc; }  // 窗口结束时返回结果
        @Override
        public Double merge(Double a, Double b) { return a + b; }  // 合并累加器(分布式场景)
    })
    .map(result -> new Tuple2<>(result.f0, result.f1));  // (product_id, total_sales)

// 3. 写入Redis(Key: product:{product_id}, Value: total_sales)
productSalesStream.addSink(new RedisSink<>(redisConfig, new RedisMapper<Tuple2<String, Double>>() {
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.SET);  // 使用SET命令
    }
    @Override
    public String getKeyFromData(Tuple2<String, Double> data) {
        return "product:" + data.f0;  // Key: product:12345
    }
    @Override
    public String getValueFromData(Tuple2<String, Double> data) {
        return data.f1.toString();  // Value: 1250.5
    }
}));

env.execute("Real-time Product Sales");

效果:大屏通过Redis API(如
GET product:12345
)每5秒获取一次数据,实时展示各商品销售额。

项目实战:电商大数据架构设计与实现

项目背景

我们要为一家中型电商公司设计大数据架构,需求如下:

数据来源:订单系统(MySQL)、用户行为日志(APP埋点)、商品信息(MongoDB)、物流数据(API接口)数据需求
离线分析:日/月销售报表、用户画像、商品推荐模型训练实时分析:实时订单监控、库存预警、首页热门商品排行
技术约束:预算有限,优先选择开源组件

架构设计

基于需求,我们设计”湖仓一体+批流融合“架构,如下:

图片[1] - 大数据领域数据架构的关键技术解析 - 宋马(注:实际写作时可插入架构图,此处用文字描述)
组件说明

数据湖:HDFS(存原始日志、爬虫数据等非结构化数据)数据仓库:Hive(存结构化业务数据,如订单表、用户表)实时数据通道:Kafka(接收APP埋点、实时订单数据)批处理引擎:Spark(跑离线ETL和报表计算)流处理引擎:Flink(处理实时数据,更新库存和热门商品)元数据管理:Atlas(记录数据血缘和表结构)BI工具:Superset(制作销售报表和仪表盘)

开发环境搭建(Docker Compose快速部署)

为了快速开发,我们用Docker Compose部署所有组件:

docker-compose.yml(核心部分):


version: '3'
services:
  # HDFS(数据湖)
  hdfs-namenode:
    image: apache/hadoop:3.3.4
    command: namenode
    environment:
      - HDFS_NAMENODE_USER=root
      - HDFS_DATANODE_USER=root
    ports:
      - "9870:9870"  # HDFS Web界面

  hdfs-datanode:
    image: apache/hadoop:3.3.4
    command: datanode
    environment:
      - HDFS_NAMENODE_USER=root
      - HDFS_DATANODE_USER=root
    depends_on:
      - hdfs-namenode

  # Kafka(消息队列)
  kafka:
    image: confluentinc/cp-kafka:7.3.0
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

  # Hive(数据仓库)
  hive-server:
    image: apache/hive:3.1.3
    ports:
      - "10000:10000"  # Hive JDBC端口
    depends_on:
      - hdfs-namenode

  # Spark(批处理)
  spark:
    image: bitnami/spark:3.3.2
    ports:
      - "8080:8080"  # Spark Master Web界面
    environment:
      - SPARK_MODE=master

  # Flink(流处理)
  flink-jobmanager:
    image: flink:1.16.0
    ports:
      - "8081:8081"  # Flink Web界面
    command: jobmanager

  flink-taskmanager:
    image: flink:1.16.0
    depends_on:
      - flink-jobmanager
    command: taskmanager

启动命令:


docker-compose up -d  # 后台启动所有服务

核心模块实现

模块1:数据采集(从数据源到数据湖/ Kafka)

1.1 MySQL订单数据采集(批处理)
用Sqoop工具(开源数据迁移工具)每天凌晨从MySQL全量导出订单数据到HDFS数据湖:


# Sqoop命令:从MySQL导出order表到HDFS
sqoop export 
  --connect jdbc:mysql://mysql-host:3306/ecommerce 
  --username root 
  --password password 
  --table order 
  --target-dir /data/lake/mysql/order/$(date +%Y-%m-%d)   # 按日期分区存储
  --fields-terminated-by ','   # 字段分隔符
  --m 1  # 1个Map任务(数据量小时用)

1.2 APP用户行为日志采集(流处理)
APP埋点日志通过HTTP发送到Flume(日志收集工具),Flume再将日志写入Kafka:

Flume配置文件(user_behavior.conf)


agent.sources = http-source
agent.sinks = kafka-sink
agent.channels = memory-channel

# 源:HTTP接收日志(APP发送POST请求到该端口)
agent.sources.http-source.type = http
agent.sources.http-source.bind = 0.0.0.0
agent.sources.http-source.port = 5140

# 通道:内存缓存日志
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000

#  sink:发送到Kafka的user_behavior主题
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.bootstrap.servers = kafka:9092
agent.sinks.kafka-sink.kafka.topic = user_behavior
agent.sinks.kafka-sink.serializer.class = kafka.serializer.StringEncoder

# 连接源、通道、sink
agent.sources.http-source.channels = memory-channel
agent.sinks.kafka-sink.channel = memory-channel

启动Flume:


flume-ng agent -n agent -f user_behavior.conf -Dflume.root.logger=INFO,console
模块2:数据处理(ETL + 批/流分析)

2.1 批处理ETL(Spark + Hive)
将HDFS数据湖的原始订单数据清洗后加载到Hive数据仓库:

Spark ETL脚本(order_etl.py)


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

spark = SparkSession.builder.appName("OrderETL").enableHiveSupport().getOrCreate()

# 1. 读取HDFS上的原始订单数据(昨天的)
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
raw_order_path = f"hdfs:///data/lake/mysql/order/{yesterday}"
raw_orders = spark.read.csv(
    raw_order_path,
    header=True,  # 第一行为表头
    inferSchema=True,  # 自动推断 schema
    sep=','
)

# 2. 数据清洗
clean_orders = raw_orders.filter(
    (col("order_status") == "PAID") &  # 只保留已支付订单
    (col("amount") > 0) &  # 金额为正
    (col("user_id").isNotNull())
).select(
    col("order_id"),
    col("user_id"),
    col("product_id"),
    col("amount"),
    to_date(col("create_time"), "yyyy-MM-dd HH:mm:ss").alias("order_date")  # 转换日期格式
)

# 3. 写入Hive数据仓库(按日期分区)
clean_orders.write.mode("append").partitionBy("order_date").saveAsTable("ecommerce.order")

spark.stop()

2.2 实时热门商品计算(Flink + Kafka + Redis)
从Kafka的user_behavior主题读取用户点击日志,实时计算”最近1小时热门商品Top10″,存入Redis供首页展示:

Flink作业(HotProducts.java)


public class HotProducts {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 1. 从Kafka读取用户行为数据(JSON格式:{"user_id": "123", "product_id": "456", "behavior": "click", "timestamp": "2024-05-01 10:00:00"})
        DataStream<String> behaviorStream = env.addSource(
            new FlinkKafkaConsumer<>("user_behavior", new SimpleStringSchema(), kafkaProps)
        );

        // 2. 解析并过滤点击行为
        DataStream<ProductView> viewStream = behaviorStream
            .map(jsonStr -> JSON.parseObject(jsonStr))
            .filter(json -> "click".equals(json.getString("behavior")))  // 只保留点击行为
            .map(json -> new ProductView(
                json.getString("product_id"),
                LocalDateTime.parse(json.getString("timestamp"), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
                    .toInstant(ZoneOffset.UTC).toEpochMilli()
            ))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.<ProductView>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((view, ts) -> view.getTimestamp())
            );

        // 3. 开窗统计(1小时窗口,每5分钟更新一次)
        DataStream<Tuple2<String, Long>> hotProductsStream = viewStream
            .keyBy(ProductView::getProductId)
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))  // 滑动窗口:1小时窗口,5分钟滑动一次
            .aggregate(new CountAgg(), new WindowResultFunction());  // 统计每个窗口内的点击数

        // 4. 按窗口分组,取Top10
        DataStream<String> top10Stream = hotProductsStream
            .keyBy(window -> window.f0)  // 按窗口结束时间分组
            .process(new TopNHotProducts(10));  // 自定义ProcessFunction取Top10

        // 5. 写入Redis(Key: hot_products:last_hour, Value: JSON数组)
        top10Stream.addSink(new RedisSink<>(redisConfig, new HotProductsRedisMapper()));

        env.execute("Hot Products Analysis");
    }
}
模块3:数据分析与应用

3.1 离线报表(Hive + Superset)
在Superset中连接Hive,创建”日销售报表”仪表盘,包含:

当日总销售额、订单量、客单价各品类销售额占比饼图销售额TOP10商品柱状图

Hive查询示例(日销售额):


SELECT 
    order_date, 
    SUM(amount) AS total_sales, 
    COUNT(DISTINCT order_id) AS order_count,
    SUM(amount)/COUNT(DISTINCT order_id) AS avg_price
FROM ecommerce.order 
WHERE order_date = CURRENT_DATE - INTERVAL '1' DAY
GROUP BY order_date;

3.2 实时大屏(Redis + Vue)
前端Vue项目通过Axios每秒请求Redis中的
hot_products:last_hour

realtime_sales
数据,用ECharts绘制实时热门商品排行和销售额趋势图。

数据治理实现

1. 元数据管理(Atlas)
在Atlas中注册Hive表,记录:

表名:
ecommerce.order
所有者:data_team@company.com数据来源:MySQL订单表分区字段:order_date血缘关系:
hdfs:///data/lake/mysql/order

ecommerce.order

2. 数据质量监控(Great Expectations)
编写数据质量规则(Expectation Suite),每天检查订单表:


expectations:
  - expectation_type: expect_column_values_to_not_be_null
    kwargs:
      column: order_id
  - expectation_type: expect_column_values_to_be_between
    kwargs:
      column: amount
      min_value: 0
      max_value: 100000

3. 权限控制(Ranger)

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容